You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/08/26 05:33:59 UTC
[dolphinscheduler] branch dev updated: Refactor worker execute task process (#11540)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 1b120e3a59 Refactor worker execute task process (#11540)
1b120e3a59 is described below
commit 1b120e3a5959a277c1b363dcd2d9cb88fcc32cc9
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri Aug 26 13:33:51 2022 +0800
Refactor worker execute task process (#11540)
* Refactor worker execute task process
---
.../plugin/task/api/AbstractTask.java | 10 +-
.../plugin/task/api/AbstractYarnTask.java | 11 +-
.../plugin/task/api/k8s/AbstractK8sTask.java | 2 +-
.../plugin/task/chunjun/ChunJunTask.java | 23 +-
.../plugin/task/datax/DataxTask.java | 48 +--
.../plugin/task/dinky/DinkyTask.java | 95 +++---
.../dolphinscheduler/plugin/task/dvc/DvcTask.java | 14 +-
.../plugin/task/emr/EmrAddStepsTask.java | 17 +-
.../plugin/task/emr/EmrJobFlowTask.java | 6 +-
.../plugin/task/http/HttpTask.java | 5 +-
.../plugin/task/jupyter/JupyterTask.java | 24 +-
.../plugin/task/mlflow/MlflowTask.java | 16 +-
.../plugin/task/pigeon/PigeonTask.java | 12 +-
.../plugin/task/procedure/ProcedureTask.java | 12 +-
.../plugin/task/python/PythonTask.java | 2 +-
.../plugin/task/pytorch/PytorchTask.java | 9 +-
.../plugin/task/sagemaker/SagemakerTask.java | 5 +-
.../plugin/task/seatunnel/SeatunnelTask.java | 21 +-
.../plugin/task/shell/ShellTask.java | 19 +-
.../dolphinscheduler/plugin/task/sql/SqlTask.java | 15 +-
.../plugin/task/zeppelin/ZeppelinTask.java | 11 +-
.../plugin/task/zeppelin/ZeppelinTaskTest.java | 5 +-
.../worker/processor/TaskDispatchProcessor.java | 128 ++------
.../server/worker/processor/TaskKillProcessor.java | 27 +-
.../worker/processor/TaskSavePointProcessor.java | 19 +-
.../DefaultWorkerDelayTaskExecuteRunnable.java | 60 ++++
...faultWorkerDelayTaskExecuteRunnableFactory.java | 53 +++
.../server/worker/runner/TaskExecuteThread.java | 364 ---------------------
.../runner/WorkerDelayTaskExecuteRunnable.java | 61 ++++
.../WorkerDelayTaskExecuteRunnableFactory.java | 59 ++++
.../server/worker/runner/WorkerExecService.java | 8 +-
.../server/worker/runner/WorkerManagerThread.java | 60 ++--
.../worker/runner/WorkerTaskExecuteRunnable.java | 275 ++++++++++++++++
.../runner/WorkerTaskExecuteRunnableFactory.java | 23 ++
.../WorkerTaskExecuteRunnableFactoryBuilder.java | 50 +++
.../worker/utils/TaskExecutionCheckerUtils.java | 129 ++++++++
.../processor/TaskDispatchProcessorTest.java | 171 +++-------
.../DefaultWorkerDelayTaskExecuteRunnableTest.java | 73 +++++
.../worker/runner/TaskExecuteThreadTest.java | 87 -----
39 files changed, 1106 insertions(+), 923 deletions(-)
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
index 80ee36b9d1..0b23905059 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
@@ -28,9 +28,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -103,12 +101,8 @@ public abstract class AbstractTask {
return null;
}
- /**
- * task handle
- *
- * @throws Exception exception
- */
- public abstract void handle() throws Exception;
+ public abstract void handle() throws TaskException;
+
/**
* cancel application
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
index 4f1913ba3d..c9a9c4851c 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
@@ -20,8 +20,6 @@ package org.apache.dolphinscheduler.plugin.task.api;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
-import java.util.List;
-import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
@@ -51,7 +49,7 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws Exception {
+ public void handle() throws TaskException {
try {
// SHELL task exit code
TaskResponse response = shellCommandExecutor.run(buildCommand());
@@ -59,10 +57,15 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor {
// set appIds
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(response.getProcessId());
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ logger.info("The current yarn task has been interrupted", ex);
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ throw new TaskException("The current yarn task has been interrupted", ex);
} catch (Exception e) {
logger.error("yarn process failure", e);
exitStatusCode = -1;
- throw e;
+ throw new TaskException("Execute task failed", e);
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
index 8d7f7eac7f..a82e5f6bbd 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
@@ -39,7 +39,7 @@ public abstract class AbstractK8sTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws Exception {
+ public void handle() throws TaskException {
try {
TaskResponse response = abstractK8sTaskExecutor.run(buildCommand());
setExitStatusCode(response.getExitStatusCode());
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
index a416d0c02c..46fcf2ec2f 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
@@ -17,12 +17,12 @@
package org.apache.dolphinscheduler.plugin.task.chunjun;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
-
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.SystemUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
@@ -32,9 +32,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.spi.enums.Flag;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.SystemUtils;
-
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@@ -48,6 +45,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
+
/**
* chunjun task
*/
@@ -101,10 +101,10 @@ public class ChunJunTask extends AbstractTaskExecutor {
/**
* run chunjun process
*
- * @throws Exception exception
+ * @throws TaskException exception
*/
@Override
- public void handle() throws Exception {
+ public void handle() throws TaskException {
try {
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
@@ -115,10 +115,15 @@ public class ChunJunTask extends AbstractTaskExecutor {
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("The current ChunJun Task has been interrupted", e);
+ setExitStatusCode(EXIT_CODE_FAILURE);
+ throw new TaskException("The current ChunJun Task has been interrupted", e);
} catch (Exception e) {
logger.error("chunjun task failed.", e);
setExitStatusCode(EXIT_CODE_FAILURE);
- throw e;
+ throw new TaskException("Execute chunjun task failed", e);
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
index 08a8c8acc8..873d8f7424 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
@@ -17,32 +17,38 @@
package org.apache.dolphinscheduler.plugin.task.datax;
-import static org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils.decodePassword;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
-
+import com.alibaba.druid.sql.ast.SQLStatement;
+import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
+import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
+import com.alibaba.druid.sql.ast.statement.SQLSelect;
+import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
+import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
+import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
+import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
+import com.alibaba.druid.sql.parser.SQLStatementParser;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SystemUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
-import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.enums.Flag;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.SystemUtils;
-
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@@ -58,7 +64,6 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -66,17 +71,9 @@ import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import com.alibaba.druid.sql.ast.SQLStatement;
-import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
-import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
-import com.alibaba.druid.sql.ast.statement.SQLSelect;
-import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
-import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
-import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
-import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
-import com.alibaba.druid.sql.parser.SQLStatementParser;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
+import static org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils.decodePassword;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
public class DataxTask extends AbstractTaskExecutor {
/**
@@ -150,7 +147,7 @@ public class DataxTask extends AbstractTaskExecutor {
* @throws Exception if error throws Exception
*/
@Override
- public void handle() throws Exception {
+ public void handle() throws TaskException {
try {
// replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
@@ -163,10 +160,15 @@ public class DataxTask extends AbstractTaskExecutor {
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("The current DataX task has been interrupted", e);
+ setExitStatusCode(EXIT_CODE_FAILURE);
+ throw new TaskException("The current DataX task has been interrupted", e);
} catch (Exception e) {
logger.error("datax task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
- throw e;
+ throw new TaskException("Execute DataX task failed", e);
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
index 622a674780..987126b0a7 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
@@ -17,13 +17,17 @@
package org.apache.dolphinscheduler.plugin.task.dinky;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.MissingNode;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
@@ -39,10 +43,7 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.MissingNode;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
public class DinkyTask extends AbstractTaskExecutor {
@@ -77,47 +78,55 @@ public class DinkyTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws Exception {
- String address = this.dinkyParameters.getAddress();
- String taskId = this.dinkyParameters.getTaskId();
- boolean isOnline = this.dinkyParameters.isOnline();
- JsonNode result;
- if (isOnline) {
- // Online dinky task, and only one job is allowed to execute
- result = onlineTask(address, taskId);
- } else {
- // Submit dinky task
- result = submitTask(address, taskId);
- }
- if (checkResult(result)) {
- boolean status = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("success").asBoolean();
- String jobInstanceId = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("jobInstanceId").asText();
- boolean finishFlag = false;
- while (!finishFlag) {
- JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId);
- if (!checkResult(jobInstanceInfoResult)) {
- break;
- }
- String jobInstanceStatus = jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText();
- switch (jobInstanceStatus) {
- case DinkyTaskConstants.STATUS_FINISHED:
- final int exitStatusCode = mapStatusToExitCode(status);
- // Use address-taskId as app id
- setAppIds(String.format("%s-%s", address, taskId));
- setExitStatusCode(exitStatusCode);
- logger.info("dinky task finished with results: {}", result.get(DinkyTaskConstants.API_RESULT_DATAS));
- finishFlag = true;
- break;
- case DinkyTaskConstants.STATUS_FAILED:
- case DinkyTaskConstants.STATUS_CANCELED:
- case DinkyTaskConstants.STATUS_UNKNOWN:
- errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error").asText());
- finishFlag = true;
+ public void handle() throws TaskException {
+ try {
+
+ String address = this.dinkyParameters.getAddress();
+ String taskId = this.dinkyParameters.getTaskId();
+ boolean isOnline = this.dinkyParameters.isOnline();
+ JsonNode result;
+ if (isOnline) {
+ // Online dinky task, and only one job is allowed to execute
+ result = onlineTask(address, taskId);
+ } else {
+ // Submit dinky task
+ result = submitTask(address, taskId);
+ }
+ if (checkResult(result)) {
+ boolean status = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("success").asBoolean();
+ String jobInstanceId = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("jobInstanceId").asText();
+ boolean finishFlag = false;
+ while (!finishFlag) {
+ JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId);
+ if (!checkResult(jobInstanceInfoResult)) {
break;
- default:
- Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS);
+ }
+ String jobInstanceStatus = jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText();
+ switch (jobInstanceStatus) {
+ case DinkyTaskConstants.STATUS_FINISHED:
+ final int exitStatusCode = mapStatusToExitCode(status);
+ // Use address-taskId as app id
+ setAppIds(String.format("%s-%s", address, taskId));
+ setExitStatusCode(exitStatusCode);
+ logger.info("dinky task finished with results: {}", result.get(DinkyTaskConstants.API_RESULT_DATAS));
+ finishFlag = true;
+ break;
+ case DinkyTaskConstants.STATUS_FAILED:
+ case DinkyTaskConstants.STATUS_CANCELED:
+ case DinkyTaskConstants.STATUS_UNKNOWN:
+ errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error").asText());
+ finishFlag = true;
+ break;
+ default:
+ Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS);
+ }
}
}
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ logger.error("Execute dinkyTask failed", ex);
+ setExitStatusCode(EXIT_CODE_FAILURE);
+ throw new TaskException("Execute dinkyTask failed", ex);
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
index 8176999e4c..2e3bece309 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
@@ -17,11 +17,10 @@
package org.apache.dolphinscheduler.plugin.task.dvc;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
@@ -30,6 +29,8 @@ import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+
/**
* shell task
*/
@@ -74,7 +75,7 @@ public class DvcTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws Exception {
+ public void handle() throws TaskException {
try {
// construct process
String command = buildCommand();
@@ -83,10 +84,15 @@ public class DvcTask extends AbstractTaskExecutor {
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
parameters.dealOutParam(shellCommandExecutor.getVarPool());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("The current DvcTask has been interrupted", e);
+ setExitStatusCode(EXIT_CODE_FAILURE);
+ throw new TaskException("The current DvcTask has been interrupted", e);
} catch (Exception e) {
logger.error("dvc task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
- throw e;
+ throw new TaskException("Execute dvc task failed", e);
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
index d747577b71..6f4fc95e48 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
@@ -17,12 +17,6 @@
package org.apache.dolphinscheduler.plugin.task.emr;
-import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-
-import java.util.HashSet;
-import java.util.concurrent.TimeUnit;
-
import com.amazonaws.SdkBaseException;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
@@ -36,6 +30,12 @@ import com.amazonaws.services.elasticmapreduce.model.StepState;
import com.amazonaws.services.elasticmapreduce.model.StepStatus;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Sets;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
/**
* AddJobFlowSteps task executor
@@ -62,7 +62,7 @@ public class EmrAddStepsTask extends AbstractEmrTask {
}
@Override
- public void handle() throws InterruptedException {
+ public void handle() throws TaskException {
StepStatus stepStatus = null;
try {
AddJobFlowStepsRequest addJobFlowStepsRequest = createAddJobFlowStepsRequest();
@@ -84,6 +84,9 @@ public class EmrAddStepsTask extends AbstractEmrTask {
} catch (EmrTaskException | SdkBaseException e) {
logger.error("emr task submit failed with error", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new TaskException("Execute emr task failed", e);
} finally {
final int exitStatusCode = calculateExitStatusCode(stepStatus);
setExitStatusCode(exitStatusCode);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
index ed42de2831..bc361f6538 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.plugin.task.emr;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.util.HashSet;
@@ -55,7 +56,7 @@ public class EmrJobFlowTask extends AbstractEmrTask {
}
@Override
- public void handle() throws InterruptedException {
+ public void handle() throws TaskException {
ClusterStatus clusterStatus = null;
try {
RunJobFlowRequest runJobFlowRequest = createRunJobFlowRequest();
@@ -76,6 +77,9 @@ public class EmrJobFlowTask extends AbstractEmrTask {
} catch (EmrTaskException | SdkBaseException e) {
logger.error("emr task submit failed with error", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new TaskException("Execute emr task failed", e);
} finally {
final int exitStatusCode = calculateExitStatusCode(clusterStatus);
setExitStatusCode(exitStatusCode);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
index 47bb7e9b21..df17725cf2 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.http;
import static org.apache.dolphinscheduler.plugin.task.http.HttpTaskConstants.APPLICATION_JSON;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
@@ -89,7 +90,7 @@ public class HttpTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws Exception {
+ public void handle() throws TaskException {
long startTime = System.currentTimeMillis();
String formatTimeStamp = DateUtils.formatTimeStamp(startTime);
String statusCode = null;
@@ -108,7 +109,7 @@ public class HttpTask extends AbstractTaskExecutor {
appendMessage(e.toString());
exitStatusCode = -1;
logger.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:" + output, e);
- throw e;
+ throw new TaskException("Execute http task failed", e);
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
index 17f4946e19..ab135ac99e 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
@@ -17,28 +17,27 @@
package org.apache.dolphinscheduler.plugin.task.jupyter;
-import org.apache.dolphinscheduler.spi.utils.JSONUtils;
-import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
-import org.apache.dolphinscheduler.spi.utils.StringUtils;
-import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
-import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
-
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.spi.utils.DateUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
public class JupyterTask extends AbstractTaskExecutor {
/**
@@ -78,17 +77,22 @@ public class JupyterTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws Exception {
+ public void handle() throws TaskException {
try {
// SHELL task exit code
TaskResponse response = shellCommandExecutor.run(buildCommand());
setExitStatusCode(response.getExitStatusCode());
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(response.getProcessId());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("The current Jupyter task has been interrupted", e);
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ throw new TaskException("The current Jupyter task has been interrupted", e);
} catch (Exception e) {
logger.error("jupyter task execution failure", e);
exitStatusCode = -1;
- throw e;
+ throw new TaskException("Execute jupyter task failed", e);
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
index 6e49da322e..0c42a5c09b 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
@@ -17,12 +17,11 @@
package org.apache.dolphinscheduler.plugin.task.mlflow;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
@@ -36,6 +35,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+
/**
* shell task
*/
@@ -80,7 +81,7 @@ public class MlflowTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws Exception {
+ public void handle() throws TaskException {
try {
// construct process
String command = buildCommand();
@@ -95,10 +96,15 @@ public class MlflowTask extends AbstractTaskExecutor {
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
mlflowParameters.dealOutParam(shellCommandExecutor.getVarPool());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("The current Mlflow task has been interrupted", e);
+ setExitStatusCode(EXIT_CODE_FAILURE);
+ throw new TaskException("The current Mlflow task has been interrupted", e);
} catch (Exception e) {
- logger.error("shell task error", e);
+ logger.error("Mlflow task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
- throw e;
+ throw new TaskException("Execute Mlflow task failed", e);
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
index 028eddb1c1..e5771d588d 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
@@ -17,14 +17,14 @@
package org.apache.dolphinscheduler.plugin.task.pigeon;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import org.apache.commons.collections4.CollectionUtils;
import org.apache.http.HttpEntity;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -33,6 +33,8 @@ import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
+import org.java_websocket.client.WebSocketClient;
+import org.java_websocket.handshake.ServerHandshake;
import java.net.HttpURLConnection;
import java.net.URI;
@@ -42,9 +44,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
-import org.java_websocket.client.WebSocketClient;
-import org.java_websocket.handshake.ServerHandshake;
-
/**
* TIS DataX Task
**/
@@ -74,7 +73,7 @@ public class PigeonTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws Exception {
+ public void handle() throws TaskException {
// Trigger PIGEON DataX pipeline
logger.info("start execute PIGEON task");
long startTime = System.currentTimeMillis();
@@ -150,6 +149,7 @@ public class PigeonTask extends AbstractTaskExecutor {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
+ throw new TaskException("Execute pigeon task failed", e);
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
index fc886983e5..02e960ebdb 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
@@ -17,19 +17,16 @@
package org.apache.dolphinscheduler.plugin.task.procedure;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
-
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
-import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -44,6 +41,9 @@ import java.sql.Types;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
+
/**
* procedure task
*/
@@ -84,7 +84,7 @@ public class ProcedureTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws Exception {
+ public void handle() throws TaskException {
logger.info("procedure type : {}, datasource : {}, method : {} , localParams : {}",
procedureParameters.getType(),
procedureParameters.getDatasource(),
@@ -123,7 +123,7 @@ public class ProcedureTask extends AbstractTaskExecutor {
} catch (Exception e) {
setExitStatusCode(EXIT_CODE_FAILURE);
logger.error("procedure task error", e);
- throw e;
+ throw new TaskException("Execute procedure task failed", e);
} finally {
close(stmt, connection);
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
index 75dae471e4..6c18d206d9 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
@@ -100,7 +100,7 @@ public class PythonTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws Exception {
+ public void handle() throws TaskException {
try {
// generate the content of this python script
String pythonScriptContent = buildPythonScriptContent();
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
index b738194d4e..3cd7c7c4f8 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
@@ -65,7 +65,7 @@ public class PytorchTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws Exception {
+ public void handle() throws TaskException {
try {
String command = buildPythonExecuteCommand();
TaskResponse taskResponse = shellCommandExecutor.run(command);
@@ -73,9 +73,14 @@ public class PytorchTask extends AbstractTaskExecutor {
setAppIds(taskResponse.getAppIds());
setProcessId(taskResponse.getProcessId());
setVarPool(shellCommandExecutor.getVarPool());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("The current Pytorch task has been interrupted", e);
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ throw new TaskException("The current Pytorch task has been interrupted", e);
} catch (Exception e) {
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
- throw e;
+ throw new TaskException("Pytorch task execute failed", e);
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
index 1431da1e92..2543e60614 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
@@ -24,6 +24,7 @@ import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_G
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
@@ -80,13 +81,13 @@ public class SagemakerTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws SagemakerTaskException {
+ public void handle() throws TaskException {
try {
int exitStatusCode = handleStartPipeline();
setExitStatusCode(exitStatusCode);
} catch (Exception e) {
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
- throw new SagemakerTaskException("SageMaker task error", e);
+ throw new TaskException("SageMaker task error", e);
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index b44bc6eeac..cd3d8ebb22 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -17,12 +17,12 @@
package org.apache.dolphinscheduler.plugin.task.seatunnel;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS;
-
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.BooleanUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
@@ -30,9 +30,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.BooleanUtils;
-
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -42,6 +39,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS;
+
/**
* seatunnel task
*/
@@ -85,7 +85,7 @@ public class SeatunnelTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws Exception {
+ public void handle() throws TaskException {
try {
// construct process
String command = buildCommand();
@@ -94,10 +94,15 @@ public class SeatunnelTask extends AbstractTaskExecutor {
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
seatunnelParameters.dealOutParam(shellCommandExecutor.getVarPool());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("The current SeaTunnel task has been interrupted", e);
+ setExitStatusCode(EXIT_CODE_FAILURE);
+ throw new TaskException("The current SeaTunnel task has been interrupted", e);
} catch (Exception e) {
logger.error("SeaTunnel task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
- throw e;
+ throw new TaskException("Execute Seatunnel task failed", e);
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
index f333e617b5..7e20bed634 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
@@ -17,12 +17,11 @@
package org.apache.dolphinscheduler.plugin.task.shell;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
-
+import org.apache.commons.lang3.SystemUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
@@ -31,8 +30,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
-import org.apache.commons.lang3.SystemUtils;
-
import java.io.File;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
@@ -44,6 +41,9 @@ import java.nio.file.attribute.PosixFilePermissions;
import java.util.Map;
import java.util.Set;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
+
/**
* shell task
*/
@@ -90,7 +90,7 @@ public class ShellTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws Exception {
+ public void handle() throws TaskException {
try {
// construct process
String command = buildCommand();
@@ -99,10 +99,15 @@ public class ShellTask extends AbstractTaskExecutor {
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
shellParameters.dealOutParam(shellCommandExecutor.getVarPool());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("The current Shell task has been interrupted", e);
+ setExitStatusCode(EXIT_CODE_FAILURE);
+ throw new TaskException("The current Shell task has been interrupted", e);
} catch (Exception e) {
logger.error("shell task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
- throw e;
+ throw new TaskException("Execute shell task error", e);
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
index 6e4e6f14c9..f884f46e5e 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
@@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.plugin.task.sql;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
@@ -39,8 +42,7 @@ import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import org.apache.commons.collections4.CollectionUtils;
+import org.slf4j.Logger;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -58,11 +60,6 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import org.slf4j.Logger;
-
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
public class SqlTask extends AbstractTaskExecutor {
/**
@@ -117,7 +114,7 @@ public class SqlTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws Exception {
+ public void handle() throws TaskException {
logger.info("Full sql parameters: {}", sqlParameters);
logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit {}",
sqlParameters.getType(),
@@ -163,7 +160,7 @@ public class SqlTask extends AbstractTaskExecutor {
} catch (Exception e) {
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
logger.error("sql task error", e);
- throw e;
+ throw new TaskException("Execute sql task failed", e);
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
index f46b7ce56c..f51b352d9c 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
@@ -17,13 +17,15 @@
package org.apache.dolphinscheduler.plugin.task.zeppelin;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import kong.unirest.Unirest;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.DateUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
-
import org.apache.zeppelin.client.ClientConfig;
import org.apache.zeppelin.client.NoteResult;
import org.apache.zeppelin.client.ParagraphResult;
@@ -34,10 +36,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import kong.unirest.Unirest;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
public class ZeppelinTask extends AbstractTaskExecutor {
/**
@@ -77,7 +75,7 @@ public class ZeppelinTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws Exception {
+ public void handle() throws TaskException {
try {
final String paragraphId = this.zeppelinParameters.getParagraphId();
final String productionNoteDirectory = this.zeppelinParameters.getProductionNoteDirectory();
@@ -142,6 +140,7 @@ public class ZeppelinTask extends AbstractTaskExecutor {
} catch (Exception e) {
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
logger.error("zeppelin task submit failed with error", e);
+ throw new TaskException("Execute ZeppelinTask exception");
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
index 397d68347e..ff70ba5b45 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
@@ -28,6 +28,7 @@ import static org.powermock.api.mockito.PowerMockito.spy;
import static org.powermock.api.mockito.PowerMockito.when;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.utils.DateUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
@@ -127,10 +128,10 @@ public class ZeppelinTaskTest {
Assert.assertEquals(EXIT_CODE_FAILURE, this.zeppelinTask.getExitStatusCode());
}
- @Test
+ @Test(expected = TaskException.class)
public void testHandleWithParagraphExecutionException() throws Exception {
when(this.zClient.executeParagraph(any(), any(), any(Map.class))).
- thenThrow(new Exception("Something wrong happens from zeppelin side"));
+ thenThrow(new TaskException("Something wrong happens from zeppelin side"));
// when(this.paragraphResult.getStatus()).thenReturn(Status.ERROR);
this.zeppelinTask.handle();
Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
index 17af3e9a47..4bb0c92124 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
@@ -17,14 +17,14 @@
package org.apache.dolphinscheduler.server.worker.processor;
-import org.apache.dolphinscheduler.common.Constants;
+import com.google.common.base.Preconditions;
+import io.micrometer.core.annotation.Counted;
+import io.micrometer.core.annotation.Timed;
+import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
-import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -36,26 +36,16 @@ import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerDelayTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnableFactoryBuilder;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
-
-import org.apache.commons.lang.SystemUtils;
-
-import java.util.Date;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.google.common.base.Preconditions;
-
-import io.micrometer.core.annotation.Counted;
-import io.micrometer.core.annotation.Timed;
-import io.netty.channel.Channel;
-
/**
* Used to handle {@link CommandType#TASK_DISPATCH_REQUEST}
*/
@@ -104,7 +94,7 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
logger.error("task execute request command content is null");
return;
}
- final String masterAddress = taskDispatchCommand.getMessageSenderAddress();
+ final String workflowMasterAddress = taskDispatchCommand.getMessageSenderAddress();
logger.info("task execute request message: {}", taskDispatchCommand);
TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext();
@@ -114,111 +104,39 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
return;
}
try {
- LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskInstanceId());
-
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
-
// set cache, it will be used when kill task
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
-
- // todo custom logger
-
taskExecutionContext.setHost(workerConfig.getWorkerAddress());
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
- if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
- boolean osUserExistFlag;
- // if Using distributed is true and Currently supported systems are linux,Should not let it
- // automatically
- // create tenants,so TenantAutoCreate has no effect
- if (workerConfig.isTenantDistributedUser() && SystemUtils.IS_OS_LINUX) {
- // use the id command to judge in linux
- osUserExistFlag = OSUtils.existTenantCodeInLinux(taskExecutionContext.getTenantCode());
- } else if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
- // if not exists this user, then create
- OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
- osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
- } else {
- osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
- }
-
- // check if the OS user exists
- if (!osUserExistFlag) {
- logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
- taskExecutionContext.getTenantCode(),
- taskExecutionContext.getTaskInstanceId());
- TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
- taskExecutionContext.setEndTime(new Date());
- workerMessageSender.sendMessageWithRetry(taskExecutionContext,
- masterAddress,
- CommandType.TASK_EXECUTE_RESULT);
- return;
- }
-
- // local execute path
- String execLocalPath = getExecLocalPath(taskExecutionContext);
- logger.info("task instance local execute path : {}", execLocalPath);
- taskExecutionContext.setExecutePath(execLocalPath);
-
- try {
- FileUtils.createWorkDirIfAbsent(execLocalPath);
- } catch (Throwable ex) {
- logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}",
- execLocalPath,
- taskExecutionContext.getTaskInstanceId(),
- ex);
- TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
- workerMessageSender.sendMessageWithRetry(taskExecutionContext,
- masterAddress,
- CommandType.TASK_EXECUTE_RESULT);
- return;
- }
- }
-
// delay task process
- long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
- taskExecutionContext.getDelayTime() * 60L);
+ long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L);
if (remainTime > 0) {
- logger.info("delay the execution of task instance {}, delay time: {} s",
- taskExecutionContext.getTaskInstanceId(),
- remainTime);
+ logger.info("Current taskInstance is choose delay execution, delay time: {}s", remainTime);
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
- taskExecutionContext.setStartTime(null);
- workerMessageSender.sendMessage(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
+ workerMessageSender.sendMessage(taskExecutionContext, workflowMasterAddress, CommandType.TASK_EXECUTE_RESULT);
}
+ WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder.createWorkerDelayTaskExecuteRunnableFactory(
+ taskExecutionContext,
+ workerConfig,
+ workflowMasterAddress,
+ workerMessageSender,
+ alertClientService,
+ taskPluginManager,
+ storageOperate)
+ .createWorkerTaskExecuteRunnable();
// submit task to manager
- boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext,
- masterAddress,
- workerMessageSender,
- alertClientService,
- taskPluginManager,
- storageOperate));
+ boolean offer = workerManager.offer(workerTaskExecuteRunnable);
if (!offer) {
- logger.warn("submit task to wait queue error, queue is full, queue size is {}, taskInstanceId: {}",
- workerManager.getWaitSubmitQueueSize(),
- taskExecutionContext.getTaskInstanceId());
- workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_REJECT);
+ logger.warn("submit task to wait queue error, queue is full, current queue size is {}, will send a task reject message to master", workerManager.getWaitSubmitQueueSize());
+ workerMessageSender.sendMessageWithRetry(taskExecutionContext, workflowMasterAddress, CommandType.TASK_REJECT);
}
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
- /**
- * get execute local path
- *
- * @param taskExecutionContext taskExecutionContext
- * @return execute local path
- */
- private String getExecLocalPath(TaskExecutionContext taskExecutionContext) {
- return FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
- taskExecutionContext.getProcessDefineCode(),
- taskExecutionContext.getProcessDefineVersion(),
- taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskInstanceId());
- }
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index f4f85277df..fd7e1d726c 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -17,6 +17,11 @@
package org.apache.dolphinscheduler.server.worker.processor;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
@@ -34,25 +39,17 @@ import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
-import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
import org.apache.dolphinscheduler.service.log.LogClientService;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
/**
* task kill processor
@@ -161,12 +158,12 @@ public class TaskKillProcessor implements NettyRequestProcessor {
* @param taskInstanceId
*/
protected void cancelApplication(int taskInstanceId) {
- TaskExecuteThread taskExecuteThread = workerManager.getTaskExecuteThread(taskInstanceId);
- if (taskExecuteThread == null) {
+ WorkerTaskExecuteRunnable workerTaskExecuteRunnable = workerManager.getTaskExecuteThread(taskInstanceId);
+ if (workerTaskExecuteRunnable == null) {
logger.warn("taskExecuteThread not found, taskInstanceId:{}", taskInstanceId);
return;
}
- AbstractTask task = taskExecuteThread.getTask();
+ AbstractTask task = workerTaskExecuteRunnable.getTask();
if (task == null) {
logger.warn("task not found, taskInstanceId:{}", taskInstanceId);
return;
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java
index 5401c66859..899ac7d9b7 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java
@@ -17,6 +17,10 @@
package org.apache.dolphinscheduler.server.worker.processor;
+import com.google.common.base.Preconditions;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -27,20 +31,13 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskSavePointRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskSavePointResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
-
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.google.common.base.Preconditions;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-
/**
* task save point processor
*/
@@ -98,12 +95,12 @@ public class TaskSavePointProcessor implements NettyRequestProcessor {
}
protected void doSavePoint(int taskInstanceId) {
- TaskExecuteThread taskExecuteThread = workerManager.getTaskExecuteThread(taskInstanceId);
- if (taskExecuteThread == null) {
+ WorkerTaskExecuteRunnable workerTaskExecuteRunnable = workerManager.getTaskExecuteThread(taskInstanceId);
+ if (workerTaskExecuteRunnable == null) {
logger.warn("taskExecuteThread not found, taskInstanceId:{}", taskInstanceId);
return;
}
- AbstractTask task = taskExecuteThread.getTask();
+ AbstractTask task = workerTaskExecuteRunnable.getTask();
if (task == null) {
logger.warn("task not found, taskInstanceId:{}", taskInstanceId);
return;
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java
new file mode 100644
index 0000000000..1b63b063e8
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+
+import javax.annotation.Nullable;
+
+public class DefaultWorkerDelayTaskExecuteRunnable extends WorkerDelayTaskExecuteRunnable {
+
+ public DefaultWorkerDelayTaskExecuteRunnable(@NonNull TaskExecutionContext taskExecutionContext,
+ @NonNull WorkerConfig workerConfig,
+ @NonNull String workflowMaster,
+ @NonNull WorkerMessageSender workerMessageSender,
+ @NonNull AlertClientService alertClientService,
+ @NonNull TaskPluginManager taskPluginManager,
+ @Nullable StorageOperate storageOperate) {
+ super(taskExecutionContext, workerConfig, workflowMaster, workerMessageSender, alertClientService, taskPluginManager, storageOperate);
+ }
+
+ @Override
+ public void executeTask() throws TaskException {
+ if (task == null) {
+ throw new TaskException("The task plugin instance is not initialized");
+ }
+ task.handle();
+ }
+
+ @Override
+ protected void afterExecute() {
+ super.afterExecute();
+ }
+
+ @Override
+ protected void afterThrowing(Throwable throwable) throws TaskException {
+ super.afterThrowing(throwable);
+ }
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java
new file mode 100644
index 0000000000..53de57ec68
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+
+import javax.annotation.Nullable;
+
+public class DefaultWorkerDelayTaskExecuteRunnableFactory extends WorkerDelayTaskExecuteRunnableFactory<DefaultWorkerDelayTaskExecuteRunnable> {
+
+ protected DefaultWorkerDelayTaskExecuteRunnableFactory(@NonNull TaskExecutionContext taskExecutionContext,
+ @NonNull WorkerConfig workerConfig,
+ @NonNull String workflowMasterAddress,
+ @NonNull WorkerMessageSender workerMessageSender,
+ @NonNull AlertClientService alertClientService,
+ @NonNull TaskPluginManager taskPluginManager,
+ @Nullable StorageOperate storageOperate) {
+ super(taskExecutionContext, workerConfig, workflowMasterAddress, workerMessageSender, alertClientService, taskPluginManager, storageOperate);
+ }
+
+ @Override
+ public DefaultWorkerDelayTaskExecuteRunnable createWorkerTaskExecuteRunnable() {
+ return new DefaultWorkerDelayTaskExecuteRunnable(
+ taskExecutionContext,
+ workerConfig,
+ workflowMasterAddress,
+ workerMessageSender,
+ alertClientService,
+ taskPluginManager,
+ storageOperate);
+ }
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
deleted file mode 100644
index 9b7e8c7e85..0000000000
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.runner;
-
-import com.google.common.base.Strings;
-import lombok.NonNull;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.WarningType;
-import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException;
-import org.apache.dolphinscheduler.common.storage.StorageOperate;
-import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
-import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
-import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.server.utils.ProcessUtils;
-import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
-import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-import org.apache.dolphinscheduler.service.alert.AlertClientService;
-import org.apache.dolphinscheduler.service.exceptions.ServiceException;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
-import java.nio.file.Paths;
-import java.util.*;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
-
-/**
- * task scheduler thread
- */
-public class TaskExecuteThread implements Runnable, Delayed {
-
- /**
- * logger
- */
- private final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
-
- /**
- * task instance
- */
- private final TaskExecutionContext taskExecutionContext;
-
- private final String masterAddress;
-
- private final StorageOperate storageOperate;
-
- /**
- * abstract task
- */
- private AbstractTask task;
-
- /**
- * task callback service
- */
- private final WorkerMessageSender workerMessageSender;
-
- /**
- * alert client server
- */
- private final AlertClientService alertClientService;
-
- private TaskPluginManager taskPluginManager;
-
- /**
- * constructor
- *
- * @param taskExecutionContext taskExecutionContext
- * @param workerMessageSender used for worker send message to master
- */
- public TaskExecuteThread(@NonNull TaskExecutionContext taskExecutionContext,
- @NonNull String masterAddress,
- @NonNull WorkerMessageSender workerMessageSender,
- @NonNull AlertClientService alertClientService,
- StorageOperate storageOperate) {
- this.taskExecutionContext = taskExecutionContext;
- this.masterAddress = masterAddress;
- this.workerMessageSender = workerMessageSender;
- this.alertClientService = alertClientService;
- this.storageOperate = storageOperate;
- }
-
- public TaskExecuteThread(@NonNull TaskExecutionContext taskExecutionContext,
- @NonNull String masterAddress,
- @NonNull WorkerMessageSender workerMessageSender,
- @NonNull AlertClientService alertClientService,
- @NonNull TaskPluginManager taskPluginManager,
- StorageOperate storageOperate) {
- this.taskExecutionContext = taskExecutionContext;
- this.masterAddress = masterAddress;
- this.workerMessageSender = workerMessageSender;
- this.alertClientService = alertClientService;
- this.taskPluginManager = taskPluginManager;
- this.storageOperate = storageOperate;
- }
-
- @Override
- public void run() {
- try {
- LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskInstanceId());
- if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
- taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
- taskExecutionContext.setStartTime(new Date());
- taskExecutionContext.setEndTime(new Date());
- TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- workerMessageSender.sendMessageWithRetry(taskExecutionContext,
- masterAddress,
- CommandType.TASK_EXECUTE_RESULT);
- logger.info("Task dry run success");
- return;
- }
- } finally {
- LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
- }
- try {
- LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskInstanceId());
- logger.info("script path : {}", taskExecutionContext.getExecutePath());
- if (taskExecutionContext.getStartTime() == null) {
- taskExecutionContext.setStartTime(new Date());
- }
- logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
-
- // callback task execute running
- taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.RUNNING_EXECUTION);
- workerMessageSender.sendMessageWithRetry(taskExecutionContext,
- masterAddress,
- CommandType.TASK_EXECUTE_RUNNING);
-
- // copy hdfs/minio file to local
- List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(),
- taskExecutionContext.getResources());
- if (!fileDownloads.isEmpty()) {
- downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
- }
-
- taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
-
- taskExecutionContext.setTaskAppId(String.format("%s_%s",
- taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskInstanceId()));
-
- TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
- if (null == taskChannel) {
- throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.",
- taskExecutionContext.getTaskType()));
- }
- String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
- taskExecutionContext.getProcessDefineCode(),
- taskExecutionContext.getProcessDefineVersion(),
- taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskInstanceId());
- taskExecutionContext.setTaskLogName(taskLogName);
-
- // set the name of the current thread
- Thread.currentThread().setName(taskLogName);
-
- task = taskChannel.createTask(taskExecutionContext);
-
- // task init
- this.task.init();
-
- // init varPool
- this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
-
- // task handle
- this.task.handle();
-
- // task result process
- if (this.task.getNeedAlert()) {
- sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus());
- }
-
- taskExecutionContext.setCurrentExecutionStatus(this.task.getExitStatus());
- taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
- taskExecutionContext.setProcessId(this.task.getProcessId());
- taskExecutionContext.setAppIds(this.task.getAppIds());
- taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
- logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(),
- this.task.getExitStatus());
- } catch (Throwable e) {
- logger.error("task scheduler failure", e);
- kill();
- taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
- taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
- taskExecutionContext.setProcessId(this.task.getProcessId());
- taskExecutionContext.setAppIds(this.task.getAppIds());
- } finally {
- TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- workerMessageSender.sendMessageWithRetry(taskExecutionContext,
- masterAddress,
- CommandType.TASK_EXECUTE_RESULT);
- clearTaskExecPath();
- LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
- }
- }
-
- private void sendAlert(TaskAlertInfo taskAlertInfo, TaskExecutionStatus status) {
- int strategy =
- status == TaskExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode();
- alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(),
- taskAlertInfo.getContent(), strategy);
- }
-
- /**
- * when task finish, clear execute path.
- */
- private void clearTaskExecPath() {
- logger.info("develop mode is: {}", CommonUtils.isDevelopMode());
-
- if (!CommonUtils.isDevelopMode()) {
- // get exec dir
- String execLocalPath = taskExecutionContext.getExecutePath();
-
- if (Strings.isNullOrEmpty(execLocalPath)) {
- logger.warn("task: {} exec local path is empty.", taskExecutionContext.getTaskName());
- return;
- }
-
- if (SINGLE_SLASH.equals(execLocalPath)) {
- logger.warn("task: {} exec local path is '/', direct deletion is not allowed",
- taskExecutionContext.getTaskName());
- return;
- }
-
- try {
- org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath));
- logger.info("exec local path: {} cleared.", execLocalPath);
- } catch (IOException e) {
- if (e instanceof NoSuchFileException) {
- // this is expected
- } else {
- logger.error("Delete exec dir failed.", e);
- }
- }
- }
- }
-
- /**
- * kill task
- */
- public void kill() {
- if (task != null) {
- try {
- task.cancelApplication(true);
- ProcessUtils.killYarnJob(taskExecutionContext);
- } catch (Exception e) {
- logger.error("Kill task failed", e);
- }
- }
- }
-
- /**
- * download resource file
- *
- * @param execLocalPath execLocalPath
- * @param fileDownloads projectRes
- * @param logger logger
- */
- public void downloadResource(String execLocalPath, Logger logger, List<Pair<String, String>> fileDownloads) {
- for (Pair<String, String> fileDownload : fileDownloads) {
- try {
- // query the tenant code of the resource according to the name of the resource
- String fullName = fileDownload.getLeft();
- String tenantCode = fileDownload.getRight();
- String resPath = storageOperate.getResourceFileName(tenantCode, fullName);
- logger.info("get resource file from path:{}", resPath);
- long resourceDownloadStartTime = System.currentTimeMillis();
- storageOperate.download(tenantCode, resPath, execLocalPath + File.separator + fullName, false, true);
- WorkerServerMetrics
- .recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime);
- WorkerServerMetrics.recordWorkerResourceDownloadSize(
- Files.size(Paths.get(execLocalPath, fullName)));
- WorkerServerMetrics.incWorkerResourceDownloadSuccessCount();
- } catch (Exception e) {
- WorkerServerMetrics.incWorkerResourceDownloadFailureCount();
- logger.error(e.getMessage(), e);
- throw new ServiceException(e.getMessage());
- }
- }
- }
-
- /**
- * download resource check
- *
- * @param execLocalPath
- * @param projectRes
- * @return
- */
- public List<Pair<String, String>> downloadCheck(String execLocalPath, Map<String, String> projectRes) {
- if (MapUtils.isEmpty(projectRes)) {
- return Collections.emptyList();
- }
- List<Pair<String, String>> downloadFile = new ArrayList<>();
- projectRes.forEach((key, value) -> {
- File resFile = new File(execLocalPath, key);
- boolean notExist = !resFile.exists();
- if (notExist) {
- downloadFile.add(Pair.of(key, value));
- } else {
- logger.info("file : {} exists ", resFile.getName());
- }
- });
- if (!downloadFile.isEmpty() && !PropertyUtils.getResUploadStartupState()) {
- throw new StorageOperateNoConfiguredException("Storage service config does not exist!");
- }
- return downloadFile;
- }
-
- /**
- * get current TaskExecutionContext
- *
- * @return TaskExecutionContext
- */
- public TaskExecutionContext getTaskExecutionContext() {
- return this.taskExecutionContext;
- }
-
- @Override
- public long getDelay(TimeUnit unit) {
- return unit.convert(DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
- taskExecutionContext.getDelayTime() * 60L), TimeUnit.SECONDS);
- }
-
- @Override
- public int compareTo(Delayed o) {
- if (o == null) {
- return 1;
- }
- return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
- }
-
- public AbstractTask getTask() {
- return task;
- }
-}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java
new file mode 100644
index 0000000000..73e14c5132
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+public abstract class WorkerDelayTaskExecuteRunnable extends WorkerTaskExecuteRunnable implements Delayed {
+
+ protected WorkerDelayTaskExecuteRunnable(@NonNull TaskExecutionContext taskExecutionContext,
+ @NonNull WorkerConfig workerConfig,
+ @NonNull String masterAddress,
+ @NonNull WorkerMessageSender workerMessageSender,
+ @NonNull AlertClientService alertClientService,
+ @NonNull TaskPluginManager taskPluginManager,
+ @Nullable StorageOperate storageOperate) {
+ super(taskExecutionContext, workerConfig, masterAddress, workerMessageSender, alertClientService, taskPluginManager, storageOperate);
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ TaskExecutionContext taskExecutionContext = getTaskExecutionContext();
+ return unit.convert(
+ DateUtils.getRemainTime(
+ taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L), TimeUnit.SECONDS);
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ if (o == null) {
+ return 1;
+ }
+ return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
+ }
+
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java
new file mode 100644
index 0000000000..44bb8878d4
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+
+import javax.annotation.Nullable;
+
+public abstract class WorkerDelayTaskExecuteRunnableFactory<T extends WorkerDelayTaskExecuteRunnable> implements WorkerTaskExecuteRunnableFactory<T> {
+
+ protected final @NonNull TaskExecutionContext taskExecutionContext;
+ protected final @NonNull WorkerConfig workerConfig;
+ protected final @NonNull String workflowMasterAddress;
+ protected final @NonNull WorkerMessageSender workerMessageSender;
+ protected final @NonNull AlertClientService alertClientService;
+ protected final @NonNull TaskPluginManager taskPluginManager;
+ protected final @Nullable StorageOperate storageOperate;
+
+ protected WorkerDelayTaskExecuteRunnableFactory(
+ @NonNull TaskExecutionContext taskExecutionContext,
+ @NonNull WorkerConfig workerConfig,
+ @NonNull String workflowMasterAddress,
+ @NonNull WorkerMessageSender workerMessageSender,
+ @NonNull AlertClientService alertClientService,
+ @NonNull TaskPluginManager taskPluginManager,
+ @Nullable StorageOperate storageOperate) {
+ this.taskExecutionContext = taskExecutionContext;
+ this.workerConfig = workerConfig;
+ this.workflowMasterAddress = workflowMasterAddress;
+ this.workerMessageSender = workerMessageSender;
+ this.alertClientService = alertClientService;
+ this.taskPluginManager = taskPluginManager;
+ this.storageOperate = storageOperate;
+ }
+
+
+ public abstract T createWorkerTaskExecuteRunnable();
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
index 46ecfd0d5b..3bdf9534a1 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
@@ -50,17 +50,17 @@ public class WorkerExecService {
/**
* running task
*/
- private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap;
+ private final ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable> taskExecuteThreadMap;
public WorkerExecService(ExecutorService execService,
- ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap) {
+ ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable> taskExecuteThreadMap) {
this.execService = execService;
this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService);
this.taskExecuteThreadMap = taskExecuteThreadMap;
WorkerServerMetrics.registerWorkerRunningTaskGauge(taskExecuteThreadMap::size);
}
- public void submit(TaskExecuteThread taskExecuteThread) {
+ public void submit(final WorkerTaskExecuteRunnable taskExecuteThread) {
taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread);
ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread);
FutureCallback futureCallback = new FutureCallback() {
@@ -91,7 +91,7 @@ public class WorkerExecService {
return ((ThreadPoolExecutor) this.execService).getQueue().size();
}
- public Map<Integer, TaskExecuteThread> getTaskExecuteThreadMap() {
+ public Map<Integer, WorkerTaskExecuteRunnable> getTaskExecuteThreadMap() {
return taskExecuteThreadMap;
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 2466700da1..41824a9a44 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -19,17 +19,15 @@ package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
-import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
-import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.util.concurrent.BlockingQueue;
+import javax.annotation.Nullable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
@@ -41,31 +39,16 @@ public class WorkerManagerThread implements Runnable {
private final Logger logger = LoggerFactory.getLogger(WorkerManagerThread.class);
- /**
- * task queue
- */
- private final BlockingQueue<TaskExecuteThread> waitSubmitQueue;
+ private final DelayQueue<WorkerDelayTaskExecuteRunnable> waitSubmitQueue;
- @Autowired(required = false)
- private StorageOperate storageOperate;
-
- /**
- * thread executor service
- */
private final WorkerExecService workerExecService;
- /**
- * task callback service
- */
- @Autowired
- private WorkerMessageSender workerMessageSender;
-
- private volatile int workerExecThreads;
+ private final int workerExecThreads;
/**
* running task
*/
- private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable> taskExecuteThreadMap = new ConcurrentHashMap<>();
public WorkerManagerThread(WorkerConfig workerConfig) {
workerExecThreads = workerConfig.getExecThreads();
@@ -75,8 +58,8 @@ public class WorkerManagerThread implements Runnable {
taskExecuteThreadMap);
}
- public TaskExecuteThread getTaskExecuteThread(Integer taskInstanceId) {
- return this.taskExecuteThreadMap.get(taskInstanceId);
+ public @Nullable WorkerTaskExecuteRunnable getTaskExecuteThread(Integer taskInstanceId) {
+ return taskExecuteThreadMap.get(taskInstanceId);
}
/**
@@ -94,7 +77,7 @@ public class WorkerManagerThread implements Runnable {
* @return queue size
*/
public int getThreadPoolQueueSize() {
- return this.workerExecService.getThreadPoolQueueSize();
+ return workerExecService.getThreadPoolQueueSize();
}
/**
@@ -108,13 +91,7 @@ public class WorkerManagerThread implements Runnable {
.forEach(waitSubmitQueue::remove);
}
- /**
- * submit task
- *
- * @param taskExecuteThread taskExecuteThread
- * @return submit result
- */
- public boolean offer(TaskExecuteThread taskExecuteThread) {
+ public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) {
if (waitSubmitQueue.size() > workerExecThreads) {
WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
// if waitSubmitQueue is full, it will wait 1s, then try add
@@ -123,7 +100,7 @@ public class WorkerManagerThread implements Runnable {
return false;
}
}
- return waitSubmitQueue.offer(taskExecuteThread);
+ return waitSubmitQueue.offer(workerDelayTaskExecuteRunnable);
}
public void start() {
@@ -137,15 +114,14 @@ public class WorkerManagerThread implements Runnable {
@Override
public void run() {
Thread.currentThread().setName("Worker-Execute-Manager-Thread");
- TaskExecuteThread taskExecuteThread;
while (!ServerLifeCycleManager.isStopped()) {
try {
if (!ServerLifeCycleManager.isRunning()) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
if (this.getThreadPoolQueueSize() <= workerExecThreads) {
- taskExecuteThread = waitSubmitQueue.take();
- workerExecService.submit(taskExecuteThread);
+ final WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = waitSubmitQueue.take();
+ workerExecService.submit(workerDelayTaskExecuteRunnable);
} else {
WorkerServerMetrics.incWorkerOverloadCount();
logger.info("Exec queue is full, waiting submit queue {}, waiting exec queue size {}",
@@ -161,7 +137,17 @@ public class WorkerManagerThread implements Runnable {
public void clearTask() {
waitSubmitQueue.clear();
- workerExecService.getTaskExecuteThreadMap().values().forEach(TaskExecuteThread::kill);
+ workerExecService.getTaskExecuteThreadMap().values().forEach(workerTaskExecuteRunnable -> {
+ int taskInstanceId = workerTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId();
+ try {
+ workerTaskExecuteRunnable.cancelTask();
+ logger.info("Cancel the taskInstance in worker {}", taskInstanceId);
+ } catch (Exception ex) {
+ logger.error("Cancel the taskInstance error {}", taskInstanceId, ex);
+ } finally {
+ TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
+ }
+ });
workerExecService.getTaskExecuteThreadMap().clear();
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
new file mode 100644
index 0000000000..bd8beafd6b
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import com.google.common.base.Strings;
+import lombok.NonNull;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
+import org.apache.dolphinscheduler.common.utils.CommonUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionCheckerUtils;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.Date;
+
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+
+public abstract class WorkerTaskExecuteRunnable implements Runnable {
+
+ protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, WorkerTaskExecuteRunnable.class));
+
+ protected final TaskExecutionContext taskExecutionContext;
+ protected final WorkerConfig workerConfig;
+ protected final String masterAddress;
+ protected final WorkerMessageSender workerMessageSender;
+ protected final AlertClientService alertClientService;
+ protected final TaskPluginManager taskPluginManager;
+ protected final @Nullable StorageOperate storageOperate;
+
+ protected @Nullable AbstractTask task;
+
+ protected WorkerTaskExecuteRunnable(
+ @NonNull TaskExecutionContext taskExecutionContext,
+ @NonNull WorkerConfig workerConfig,
+ @NonNull String masterAddress,
+ @NonNull WorkerMessageSender workerMessageSender,
+ @NonNull AlertClientService alertClientService,
+ @NonNull TaskPluginManager taskPluginManager,
+ @Nullable StorageOperate storageOperate) {
+ this.taskExecutionContext = taskExecutionContext;
+ this.workerConfig = workerConfig;
+ this.masterAddress = masterAddress;
+ this.workerMessageSender = workerMessageSender;
+ this.alertClientService = alertClientService;
+ this.taskPluginManager = taskPluginManager;
+ this.storageOperate = storageOperate;
+ String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
+ taskExecutionContext.getProcessDefineCode(),
+ taskExecutionContext.getProcessDefineVersion(),
+ taskExecutionContext.getProcessInstanceId(),
+ taskExecutionContext.getTaskInstanceId());
+ taskExecutionContext.setTaskLogName(taskLogName);
+ logger.info("Set task logger name: {}", taskLogName);
+ }
+
+ protected abstract void executeTask();
+
+ protected void afterExecute() throws TaskException {
+ if (task == null) {
+ throw new TaskException("The current task instance is null");
+ }
+ sendAlertIfNeeded();
+
+ sendTaskResult();
+
+ TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ logger.info("Remove the current task execute context from worker cache");
+ clearTaskExecPathIfNeeded();
+ }
+
+ protected void afterThrowing(Throwable throwable) throws TaskException {
+ cancelTask();
+ TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
+ taskExecutionContext.setEndTime(new Date());
+ workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
+ logger.info("Get a exception when execute the task, will send the task execute result to master, the current task execute result is {}", TaskExecutionStatus.FAILURE);
+ }
+
+ public void cancelTask() {
+ // cancel the task
+ if (task != null) {
+ try {
+ task.cancelApplication(true);
+ ProcessUtils.killYarnJob(taskExecutionContext);
+ } catch (Exception e) {
+ logger.error("Task execute failed and cancel the application failed, this will not affect the taskInstance status, but you need to check manual", e);
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ // set the thread name to make sure the log be written to the task log file
+ Thread.currentThread().setName(taskExecutionContext.getTaskLogName());
+
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
+ logger.info("Begin to pulling task");
+
+ initializeTask();
+
+ if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
+ taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
+ taskExecutionContext.setEndTime(new Date());
+ TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
+ logger.info("The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success");
+ return;
+ }
+
+ beforeExecute();
+
+ executeTask();
+
+ afterExecute();
+
+ } catch (Throwable ex) {
+ logger.error("Task execute failed, due to meet an exception", ex);
+ afterThrowing(ex);
+ } finally {
+ LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+ }
+ }
+
+ protected void initializeTask() {
+ logger.info("Begin to initialize task");
+
+ Date taskStartTime = new Date();
+ taskExecutionContext.setStartTime(taskStartTime);
+ logger.info("Set task startTime: {}", taskStartTime);
+
+ String systemEnvPath = CommonUtils.getSystemEnvPath();
+ taskExecutionContext.setEnvFile(systemEnvPath);
+ logger.info("Set task envFile: {}", systemEnvPath);
+
+ String taskAppId = String.format("%s_%s", taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
+ taskExecutionContext.setTaskAppId(taskAppId);
+ logger.info("Set task appId: {}", taskAppId);
+
+ logger.info("End initialize task");
+ }
+
+ protected void beforeExecute() {
+ taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.RUNNING_EXECUTION);
+ workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RUNNING);
+ logger.info("Set task status to {}", TaskExecutionStatus.RUNNING_EXECUTION);
+
+ TaskExecutionCheckerUtils.checkTenantExist(workerConfig, taskExecutionContext);
+ logger.info("TenantCode:{} check success", taskExecutionContext.getTenantCode());
+
+ TaskExecutionCheckerUtils.createProcessLocalPathIfAbsent(taskExecutionContext);
+ logger.info("ProcessExecDir:{} check success", taskExecutionContext.getExecutePath());
+
+ TaskExecutionCheckerUtils.downloadResourcesIfNeeded(storageOperate, taskExecutionContext, logger);
+ logger.info("Resources:{} check success", taskExecutionContext.getResources());
+
+ TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
+ if (null == taskChannel) {
+ throw new TaskPluginException(String.format("%s task plugin not found, please check config file.", taskExecutionContext.getTaskType()));
+ }
+ task = taskChannel.createTask(taskExecutionContext);
+ if (task == null) {
+ throw new TaskPluginException(String.format("%s task is null, please check the task plugin is correct", taskExecutionContext.getTaskType()));
+ }
+ logger.info("Task plugin: {} create success", taskExecutionContext.getTaskType());
+
+ task.init();
+ logger.info("Success initialized task plugin instance success");
+
+ task.getParameters().setVarPool(taskExecutionContext.getVarPool());
+ logger.info("Success set taskVarPool: {}", taskExecutionContext.getVarPool());
+
+ }
+
+ protected void sendAlertIfNeeded() {
+ if (!task.getNeedAlert()) {
+ return;
+ }
+ logger.info("The current task need to send alert, begin to send alert");
+ TaskExecutionStatus status = task.getExitStatus();
+ TaskAlertInfo taskAlertInfo = task.getTaskAlertInfo();
+ int strategy = status == TaskExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode();
+ alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(), taskAlertInfo.getContent(), strategy);
+ logger.info("Success send alert");
+ }
+
+ protected void sendTaskResult() {
+ taskExecutionContext.setCurrentExecutionStatus(task.getExitStatus());
+ taskExecutionContext.setEndTime(new Date());
+ taskExecutionContext.setProcessId(task.getProcessId());
+ taskExecutionContext.setAppIds(task.getAppIds());
+ taskExecutionContext.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool()));
+ workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
+
+ logger.info("Send task execute result to master, the current task status: {}", taskExecutionContext.getCurrentExecutionStatus());
+ }
+
+ protected void clearTaskExecPathIfNeeded() {
+
+ String execLocalPath = taskExecutionContext.getExecutePath();
+ if (!CommonUtils.isDevelopMode()) {
+ logger.info("The current execute mode isn't develop mode, will clear the task execute file: {}", execLocalPath);
+ // get exec dir
+ if (Strings.isNullOrEmpty(execLocalPath)) {
+ logger.warn("The task execute file is {} no need to clear", taskExecutionContext.getTaskName());
+ return;
+ }
+
+ if (SINGLE_SLASH.equals(execLocalPath)) {
+ logger.warn("The task execute file is '/', direct deletion is not allowed");
+ return;
+ }
+
+ try {
+ org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath));
+ logger.info("Success clear the task execute file: {}", execLocalPath);
+ } catch (IOException e) {
+ if (e instanceof NoSuchFileException) {
+ // this is expected
+ } else {
+ logger.error("Delete task execute file: {} failed, this will not affect the task status, but you need to clear this manually", execLocalPath, e);
+ }
+ }
+ } else {
+ logger.info("The current execute mode is develop mode, will not clear the task execute file: {}", execLocalPath);
+ }
+ }
+
+ public @NonNull TaskExecutionContext getTaskExecutionContext() {
+ return taskExecutionContext;
+ }
+
+ public @Nullable AbstractTask getTask() {
+ return task;
+ }
+
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java
new file mode 100644
index 0000000000..441662f4bc
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+public interface WorkerTaskExecuteRunnableFactory<T> {
+
+ T createWorkerTaskExecuteRunnable();
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java
new file mode 100644
index 0000000000..f3edfd4c1d
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import lombok.NonNull;
+import lombok.experimental.UtilityClass;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+
+import javax.annotation.Nullable;
+
+@UtilityClass
+public class WorkerTaskExecuteRunnableFactoryBuilder {
+
+ public static WorkerDelayTaskExecuteRunnableFactory<?> createWorkerDelayTaskExecuteRunnableFactory(@NonNull TaskExecutionContext taskExecutionContext,
+ @NonNull WorkerConfig workerConfig,
+ @NonNull String workflowMasterAddress,
+ @NonNull WorkerMessageSender workerMessageSender,
+ @NonNull AlertClientService alertClientService,
+ @NonNull TaskPluginManager taskPluginManager,
+ @Nullable StorageOperate storageOperate) {
+ return new DefaultWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext,
+ workerConfig,
+ workflowMasterAddress,
+ workerMessageSender,
+ alertClientService,
+ taskPluginManager,
+ storageOperate);
+ }
+
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java
new file mode 100644
index 0000000000..50ac13f331
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.utils;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.SystemUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
+import org.apache.dolphinscheduler.common.utils.CommonUtils;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class TaskExecutionCheckerUtils {
+
+ public static void checkTenantExist(WorkerConfig workerConfig, TaskExecutionContext taskExecutionContext) {
+ try {
+ boolean osUserExistFlag;
+ // if Using distributed is true and Currently supported systems are linux,Should not let it
+ // automatically
+ // create tenants,so TenantAutoCreate has no effect
+ if (workerConfig.isTenantDistributedUser() && SystemUtils.IS_OS_LINUX) {
+ // use the id command to judge in linux
+ osUserExistFlag = OSUtils.existTenantCodeInLinux(taskExecutionContext.getTenantCode());
+ } else if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
+ // if not exists this user, then create
+ OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
+ osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
+ } else {
+ osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
+ }
+ if (!osUserExistFlag) {
+ throw new TaskException(String.format("TenantCode: %s doesn't exist", taskExecutionContext.getTenantCode()));
+ }
+ } catch (TaskException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new TaskException(String.format("TenantCode: %s doesn't exist", taskExecutionContext.getTenantCode()));
+ }
+ }
+
+ public static void createProcessLocalPathIfAbsent(TaskExecutionContext taskExecutionContext) throws TaskException {
+ try {
+ // local execute path
+ String execLocalPath = FileUtils.getProcessExecDir(
+ taskExecutionContext.getProjectCode(),
+ taskExecutionContext.getProcessDefineCode(),
+ taskExecutionContext.getProcessDefineVersion(),
+ taskExecutionContext.getProcessInstanceId(),
+ taskExecutionContext.getTaskInstanceId());
+ taskExecutionContext.setExecutePath(execLocalPath);
+ FileUtils.createWorkDirIfAbsent(execLocalPath);
+ } catch (Throwable ex) {
+ throw new TaskException("Cannot create process execute dir", ex);
+ }
+ }
+
+ public static void downloadResourcesIfNeeded(StorageOperate storageOperate, TaskExecutionContext taskExecutionContext, Logger logger) {
+ String execLocalPath = taskExecutionContext.getExecutePath();
+ Map<String, String> projectRes = taskExecutionContext.getResources();
+ if (MapUtils.isEmpty(projectRes)) {
+ return;
+ }
+ List<Pair<String, String>> downloadFiles = new ArrayList<>();
+ projectRes.forEach((key, value) -> {
+ File resFile = new File(execLocalPath, key);
+ boolean notExist = !resFile.exists();
+ if (notExist) {
+ downloadFiles.add(Pair.of(key, value));
+ } else {
+ logger.info("file : {} exists ", resFile.getName());
+ }
+ });
+ if (!downloadFiles.isEmpty() && !PropertyUtils.getResUploadStartupState()) {
+ throw new StorageOperateNoConfiguredException("Storage service config does not exist!");
+ }
+
+ if (CollectionUtils.isNotEmpty(downloadFiles)) {
+ for (Pair<String, String> fileDownload : downloadFiles) {
+ try {
+ // query the tenant code of the resource according to the name of the resource
+ String fullName = fileDownload.getLeft();
+ String tenantCode = fileDownload.getRight();
+ String resPath = storageOperate.getResourceFileName(tenantCode, fullName);
+ logger.info("get resource file from path:{}", resPath);
+ long resourceDownloadStartTime = System.currentTimeMillis();
+ storageOperate.download(tenantCode, resPath, execLocalPath + File.separator + fullName, false, true);
+ WorkerServerMetrics
+ .recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime);
+ WorkerServerMetrics.recordWorkerResourceDownloadSize(
+ Files.size(Paths.get(execLocalPath, fullName)));
+ WorkerServerMetrics.incWorkerResourceDownloadSuccessCount();
+ } catch (Exception e) {
+ WorkerServerMetrics.incWorkerResourceDownloadFailureCount();
+ throw new TaskException(String.format("Download resource file: %s error", fileDownload), e);
+ }
+ }
+ }
+ }
+}
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java
index e66563c0b4..0bf8a26c45 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java
@@ -17,156 +17,76 @@
package org.apache.dolphinscheduler.server.worker.processor;
+import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.common.utils.FileUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
-import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-
-import java.util.Date;
-import java.util.concurrent.ExecutorService;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
+import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.util.Date;
/**
* test task execute processor
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({SpringApplicationContext.class, WorkerConfig.class, FileUtils.class, JsonSerializer.class,
- JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
-@Ignore
public class TaskDispatchProcessorTest {
- private TaskExecutionContext taskExecutionContext;
-
- private WorkerMessageSender workerMessageSender;
-
- private ExecutorService workerExecService;
-
- private StorageOperate storageOperate;
+ @InjectMocks
+ private TaskDispatchProcessor taskDispatchProcessor;
+ @Mock
private WorkerConfig workerConfig;
- private Command command;
+ @Mock
+ private WorkerMessageSender workerMessageSender;
- private Command ackCommand;
+ @Mock
+ private AlertClientService alertClientService;
- private TaskDispatchCommand taskRequestCommand;
+ @Mock
+ private TaskPluginManager taskPluginManager;
- private AlertClientService alertClientService;
+ @Mock
+ private WorkerManagerThread workerManagerThread;
- private WorkerManagerThread workerManager;
-
- @Before
- public void before() throws Exception {
- // init task execution context
- taskExecutionContext = getTaskExecutionContext();
- workerConfig = new WorkerConfig();
- workerConfig.setExecThreads(1);
- workerConfig.setListenPort(1234);
- command = new Command();
- command.setType(CommandType.TASK_DISPATCH_REQUEST);
- ackCommand = new TaskExecuteRunningCommand("127.0.0.1:1234",
- "127.0.0.1:5678",
- System.currentTimeMillis()).convert2Command();
- taskRequestCommand = new TaskDispatchCommand(taskExecutionContext,
- "127.0.0.1:5678",
- "127.0.0.1:1234",
- System.currentTimeMillis());
- alertClientService = PowerMockito.mock(AlertClientService.class);
- workerExecService = PowerMockito.mock(ExecutorService.class);
- PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class))).thenReturn(null);
-
- PowerMockito.mockStatic(ChannelUtils.class);
- PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null);
-
- workerMessageSender = PowerMockito.mock(WorkerMessageSender.class);
-
- PowerMockito.mockStatic(SpringApplicationContext.class);
- PowerMockito.when(SpringApplicationContext.getBean(WorkerMessageSender.class)).thenReturn(workerMessageSender);
- PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)).thenReturn(workerConfig);
-
- workerManager = PowerMockito.mock(WorkerManagerThread.class);
-
- storageOperate = PowerMockito.mock(StorageOperate.class);
- PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext,
- "127.0.0.1:5678",
- workerMessageSender,
- alertClientService,
- storageOperate))).thenReturn(Boolean.TRUE);
-
- PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class)).thenReturn(workerManager);
-
- PowerMockito.mockStatic(ThreadUtils.class);
- PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread",
- workerConfig.getExecThreads())).thenReturn(
- workerExecService);
-
- PowerMockito.mockStatic(JsonSerializer.class);
- PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskDispatchCommand.class)).thenReturn(
- taskRequestCommand);
-
- PowerMockito.mockStatic(JSONUtils.class);
- PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class)).thenReturn(
- taskRequestCommand);
-
- PowerMockito.mockStatic(FileUtils.class);
- PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
- taskExecutionContext.getProcessDefineCode(),
- taskExecutionContext.getProcessDefineVersion(),
- taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskInstanceId())).thenReturn(
- taskExecutionContext.getExecutePath());
- PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath());
-
- SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(new TaskExecutionContext(),
- workerMessageSender,
- "127.0.0.1:5678",
- LoggerFactory.getLogger(
- TaskDispatchProcessorTest.class),
- alertClientService,
- storageOperate);
- PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments().thenReturn(simpleTaskExecuteThread);
- }
+ @Mock
+ private StorageOperate storageOperate;
@Test
- public void testNormalExecution() {
- TaskDispatchProcessor processor = new TaskDispatchProcessor();
- processor.process(null, command);
+ public void process() {
+ Channel channel = Mockito.mock(Channel.class);
+ TaskChannel taskChannel = Mockito.mock(TaskChannel.class);
+ Mockito.when(taskPluginManager.getTaskChannel(Mockito.anyString())).thenReturn(taskChannel);
- Assert.assertEquals(TaskExecutionStatus.RUNNING_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
+ TaskExecutionContext taskExecutionContext = getTaskExecutionContext();
+ Command dispatchCommand = createDispatchCommand(taskExecutionContext);
+ taskDispatchProcessor.process(channel, dispatchCommand);
+
+ Mockito.verify(workerManagerThread, Mockito.atMostOnce()).offer(Mockito.any());
+ Mockito.verify(workerMessageSender, Mockito.never()).sendMessageWithRetry(taskExecutionContext, "localhost:5678", CommandType.TASK_REJECT);
}
- @Test
- public void testDelayExecution() {
- taskExecutionContext.setDelayTime(1);
- TaskDispatchProcessor processor = new TaskDispatchProcessor();
- processor.process(null, command);
- Assert.assertEquals(TaskExecutionStatus.DELAY_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
+ public Command createDispatchCommand(TaskExecutionContext taskExecutionContext) {
+ return new TaskDispatchCommand(
+ taskExecutionContext,
+ "localhost:5678",
+ "localhost:1234",
+ System.currentTimeMillis()
+ ).convert2Command();
}
public TaskExecutionContext getTaskExecutionContext() {
@@ -184,21 +104,4 @@ public class TaskDispatchProcessorTest {
taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4");
return taskExecutionContext;
}
-
- private static class SimpleTaskExecuteThread extends TaskExecuteThread {
-
- public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext,
- WorkerMessageSender workerMessageSender,
- String masterAddress,
- Logger taskLogger,
- AlertClientService alertClientService,
- StorageOperate storageOperate) {
- super(taskExecutionContext, masterAddress, workerMessageSender, alertClientService, storageOperate);
- }
-
- @Override
- public void run() {
- //
- }
- }
}
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java
new file mode 100644
index 0000000000..39f03119ed
--- /dev/null
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.mockito.Mockito;
+
+import java.util.Date;
+
+public class DefaultWorkerDelayTaskExecuteRunnableTest {
+
+ private TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
+
+ private WorkerConfig workerConfig = Mockito.mock(WorkerConfig.class);
+
+ private String masterAddress = "localhost:5678";
+
+ private WorkerMessageSender workerMessageSender = Mockito.mock(WorkerMessageSender.class);
+
+ private AlertClientService alertClientService = Mockito.mock(AlertClientService.class);
+
+ private TaskPluginManager taskPluginManager = Mockito.mock(TaskPluginManager.class);
+
+ private StorageOperate storageOperate = Mockito.mock(StorageOperate.class);
+
+ @Test
+ public void testDryRun() {
+ TaskExecutionContext taskExecutionContext = TaskExecutionContext.builder()
+ .dryRun(Constants.DRY_RUN_FLAG_YES)
+ .taskInstanceId(0)
+ .processDefineId(0)
+ .firstSubmitTime(new Date())
+ .taskLogName("TestLogName")
+ .build();
+ WorkerTaskExecuteRunnable workerTaskExecuteRunnable = new DefaultWorkerDelayTaskExecuteRunnable(
+ taskExecutionContext,
+ workerConfig,
+ masterAddress,
+ workerMessageSender,
+ alertClientService,
+ taskPluginManager,
+ storageOperate
+ );
+
+ Assertions.assertAll(workerTaskExecuteRunnable::run);
+ Assertions.assertEquals(TaskExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus());
+ }
+
+}
\ No newline at end of file
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
deleted file mode 100644
index bd492c7ff6..0000000000
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.runner;
-
-import org.apache.dolphinscheduler.common.storage.StorageOperate;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClientTest;
-import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-import org.apache.dolphinscheduler.service.alert.AlertClientService;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
-
-import org.apache.commons.lang3.tuple.Pair;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@RunWith(PowerMockRunner.class)
-public class TaskExecuteThreadTest {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRegistryClientTest.class);
-
- @Mock
- private TaskExecutionContext taskExecutionContext;
-
- @Mock
- private WorkerMessageSender workerMessageSender;
-
- @Mock
- private AlertClientService alertClientService;
-
- @Mock
- private StorageOperate storageOperate;
-
- @Mock
- private TaskPluginManager taskPluginManager;
-
- @Test
- public void checkTest() {
- TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext,
- "127.0.0.1:5678",
- workerMessageSender,
- alertClientService,
- taskPluginManager,
- storageOperate);
-
- String path = "/";
- Map<String, String> projectRes = new HashMap<>();
- projectRes.put("shell", "shell.sh");
- List<Pair<String, String>> downloads = new ArrayList<>();
- try {
- downloads = taskExecuteThread.downloadCheck(path, projectRes);
- } catch (Exception e) {
- Assert.assertNotNull(e);
- }
- downloads.add(Pair.of("shell", "shell.sh"));
- try{
- taskExecuteThread.downloadResource(path, LOGGER, downloads);
- }catch (Exception e){
-
- }
- }
-}