You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/06/08 11:42:16 UTC
[dolphinscheduler] branch dev updated: [Feature-6758][Task] Add limit resource usage for tasks base on cgroup (#10373)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b1fb17119b [Feature-6758][Task] Add limit resource usage for tasks base on cgroup (#10373)
b1fb17119b is described below
commit b1fb17119b7e929792933040e26c5013313a43f7
Author: xiangzihao <46...@qq.com>
AuthorDate: Wed Jun 8 19:42:05 2022 +0800
[Feature-6758][Task] Add limit resource usage for tasks base on cgroup (#10373)
---
docs/docs/en/architecture/configuration.md | 1 +
docs/docs/en/guide/task/datax.md | 2 +
docs/docs/en/guide/task/jupyter.md | 2 +
docs/docs/en/guide/task/python.md | 2 +
docs/docs/en/guide/task/shell.md | 2 +
docs/docs/zh/architecture/configuration.md | 1 +
docs/docs/zh/guide/task/datax.md | 2 +
docs/docs/zh/guide/task/jupyter.md | 4 +-
docs/docs/zh/guide/task/python.md | 2 +
docs/docs/zh/guide/task/shell.md | 2 +
.../dolphinscheduler/common/model/TaskNode.java | 26 +++++++++++++
.../src/main/resources/common.properties | 5 ++-
.../dao/entity/TaskDefinition.java | 32 ++++++++++++++-
.../dao/entity/TaskDefinitionLog.java | 2 +
.../dolphinscheduler/dao/entity/TaskInstance.java | 26 +++++++++++++
.../dao/mapper/TaskDefinitionLogMapper.xml | 8 ++--
.../dao/mapper/TaskDefinitionMapper.xml | 2 +-
.../dao/mapper/TaskInstanceMapper.xml | 2 +-
.../src/main/resources/sql/dolphinscheduler_h2.sql | 6 +++
.../main/resources/sql/dolphinscheduler_mysql.sql | 6 +++
.../resources/sql/dolphinscheduler_postgresql.sql | 6 +++
.../3.0.0_schema/mysql/dolphinscheduler_ddl.sql | 9 +++++
.../postgresql/dolphinscheduler_ddl.sql | 9 +++++
.../resources/docker/file-manage/common.properties | 3 ++
.../builder/TaskExecutionContextBuilder.java | 2 +
.../master/runner/WorkflowExecuteThread.java | 4 ++
.../service/process/ProcessServiceImpl.java | 2 +
.../dolphinscheduler-task-api/pom.xml | 5 +++
.../plugin/task/api/AbstractCommandExecutor.java | 45 ++++++++++++++++++++--
.../plugin/task/api/TaskExecutionContext.java | 26 +++++++++++++
.../utils/AbstractCommandExecutorConstants.java | 27 +++++++++++++
.../plugin/task/api/utils/OSUtils.java | 10 +++++
dolphinscheduler-ui/src/locales/en_US/project.ts | 2 +
dolphinscheduler-ui/src/locales/zh_CN/project.ts | 2 +
.../projects/task/components/node/fields/index.ts | 1 +
.../components/node/fields/use-resource-limit.ts | 45 ++++++++++++++++++++++
.../projects/task/components/node/format-data.ts | 4 +-
.../task/components/node/tasks/use-datax.ts | 3 ++
.../task/components/node/tasks/use-jupyter.ts | 3 ++
.../task/components/node/tasks/use-python.ts | 3 ++
.../task/components/node/tasks/use-sea-tunnel.ts | 3 ++
.../task/components/node/tasks/use-shell.ts | 3 ++
.../task/components/node/tasks/use-sqoop.ts | 3 ++
.../views/projects/task/components/node/types.ts | 2 +
44 files changed, 345 insertions(+), 12 deletions(-)
diff --git a/docs/docs/en/architecture/configuration.md b/docs/docs/en/architecture/configuration.md
index 37718319b9..604884f203 100644
--- a/docs/docs/en/architecture/configuration.md
+++ b/docs/docs/en/architecture/configuration.md
@@ -202,6 +202,7 @@ yarn.resourcemanager.ha.rm.ids||specify the yarn resourcemanager url. if resourc
yarn.application.status.address|http://ds1:8088/ws/v1/cluster/apps/%s|keep default if ResourceManager supports HA or not use ResourceManager, or replace ds1 with corresponding hostname if ResourceManager in standalone mode
dolphinscheduler.env.path|env/dolphinscheduler_env.sh|load environment variables configs [eg: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...]
development.state|false| specify whether in development state
+task.resource.limit.state|false|specify whether in resource limit state
### application-api.properties [API-service log config]
diff --git a/docs/docs/en/guide/task/datax.md b/docs/docs/en/guide/task/datax.md
index d537817759..a57656688f 100644
--- a/docs/docs/en/guide/task/datax.md
+++ b/docs/docs/en/guide/task/datax.md
@@ -19,6 +19,8 @@ DataX task type for executing DataX programs. For DataX nodes, the worker will e
- **Environment Name**: Configure the environment name in which to run the script.
- **Number of failed retry attempts**: The number of times the task failed to be resubmitted.
- **Failed retry interval**: The time, in cents, interval for resubmitting the task after a failed task.
+- **Cpu quota**: Assign the specified CPU time quota to the task executed. Takes a percentage value. Default -1 means unlimited. For example, the full CPU load of one core is 100%,and that of 16 cores is 1600%. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
+- **Max memory**:Assign the specified max memory to the task executed. Exceeding this limit will trigger oom to be killed and will not automatically retry. Takes an MB value. Default -1 means unlimited. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
- **Delayed execution time**: The time, in cents, that a task is delayed in execution.
- **Timeout alarm**: Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will be sent and the task execution will fail.
- **Custom template**: Custom the content of the DataX node's json profile when the default data source provided does not meet the required requirements.
diff --git a/docs/docs/en/guide/task/jupyter.md b/docs/docs/en/guide/task/jupyter.md
index 92d614d65f..94208bbe10 100644
--- a/docs/docs/en/guide/task/jupyter.md
+++ b/docs/docs/en/guide/task/jupyter.md
@@ -25,6 +25,8 @@ Click [here](https://docs.conda.io/en/latest/) for more information about `conda
- Worker grouping: Assign tasks to the machines of the worker group to execute. If `Default` is selected, randomly select a worker machine for execution.
- Number of failed retry attempts: The failure task resubmitting times. It supports drop-down and hand-filling.
- Failed retry interval: The time interval for resubmitting the task after a failed task. It supports drop-down and hand-filling.
+- Cpu quota: Assign the specified CPU time quota to the task executed. Takes a percentage value. Default -1 means unlimited. For example, the full CPU load of one core is 100%,and that of 16 cores is 1600%. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
+- Max memory:Assign the specified max memory to the task executed. Exceeding this limit will trigger oom to be killed and will not automatically retry. Takes an MB value. Default -1 means unlimited. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
- Timeout alarm: Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will send and the task execution will fail.
- Conda Env Name: Name of conda environment.
- Input Note Path: Path of input jupyter note template.
diff --git a/docs/docs/en/guide/task/python.md b/docs/docs/en/guide/task/python.md
index 6d0a376696..c54a13c6ee 100644
--- a/docs/docs/en/guide/task/python.md
+++ b/docs/docs/en/guide/task/python.md
@@ -20,6 +20,8 @@ it will generate a temporary python script, and executes the script by the Linux
- Environment Name: Configure the environment name in which to run the script.
- Number of failed retry attempts: The failure task resubmitting times. It supports drop-down and hand-filling.
- Failed retry interval: The time interval for resubmitting the task after a failed task. It supports drop-down and hand-filling.
+- Cpu quota: Assign the specified CPU time quota to the task executed. Takes a percentage value. Default -1 means unlimited. For example, the full CPU load of one core is 100%,and that of 16 cores is 1600%. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
+- Max memory:Assign the specified max memory to the task executed. Exceeding this limit will trigger oom to be killed and will not automatically retry. Takes an MB value. Default -1 means unlimited. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
- Timeout alarm: Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will send and the task execution will fail.
- Script: Python program developed by the user.
- Resource: Refers to the list of resource files that need to be called in the script, and the files uploaded or created by the resource center-file management.
diff --git a/docs/docs/en/guide/task/shell.md b/docs/docs/en/guide/task/shell.md
index e397df7b01..3119cfae13 100644
--- a/docs/docs/en/guide/task/shell.md
+++ b/docs/docs/en/guide/task/shell.md
@@ -19,6 +19,8 @@ Shell task used to create a shell task type and execute a series of shell script
- Environment Name: Configure the environment name in which run the script.
- Times of failed retry attempts: The number of times the task failed to resubmit. You can select from drop-down or fill-in a number.
- Failed retry interval: The time interval for resubmitting the task after a failed task. You can select from drop-down or fill-in a number.
+- Cpu quota: Assign the specified CPU time quota to the task executed. Takes a percentage value. Default -1 means unlimited. For example, the full CPU load of one core is 100%,and that of 16 cores is 1600%. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
+- Max memory:Assign the specified max memory to the task executed. Exceeding this limit will trigger oom to be killed and will not automatically retry. Takes an MB value. Default -1 means unlimited. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
- Timeout alarm: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm email will send and the task execution will fail.
- Script: Shell program developed by users.
- Resource: Refers to the list of resource files that called in the script, and upload or create files by the Resource Center file management.
diff --git a/docs/docs/zh/architecture/configuration.md b/docs/docs/zh/architecture/configuration.md
index 94cfa57c9e..1ea4cc5ad3 100644
--- a/docs/docs/zh/architecture/configuration.md
+++ b/docs/docs/zh/architecture/configuration.md
@@ -194,6 +194,7 @@ yarn.resourcemanager.ha.rm.ids||yarn resourcemanager 地址, 如果resourcemanag
yarn.application.status.address|http://ds1:8088/ws/v1/cluster/apps/%s|如果resourcemanager开启了HA或者没有使用resourcemanager,保持默认值即可. 如果resourcemanager为单节点,你需要将ds1 配置为resourcemanager对应的hostname
dolphinscheduler.env.path|env/dolphinscheduler_env.sh|运行脚本加载环境变量配置文件[如: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...]
development.state|false|是否处于开发模式
+task.resource.limit.state|false|是否启用资源限制模式
## 5.application-api.properties [API服务配置]
diff --git a/docs/docs/zh/guide/task/datax.md b/docs/docs/zh/guide/task/datax.md
index 7f9248e9dd..49340db726 100644
--- a/docs/docs/zh/guide/task/datax.md
+++ b/docs/docs/zh/guide/task/datax.md
@@ -19,6 +19,8 @@ DataX 任务类型,用于执行 DataX 程序。对于 DataX 节点,worker
- 环境名称:配置运行脚本的环境。
- 失败重试次数:任务失败重新提交的次数。
- 失败重试间隔:任务失败重新提交任务的时间间隔,以分为单位。
+- Cpu 配额: 为执行的任务分配指定的CPU时间配额,单位百分比,默认-1代表不限制,例如1个核心的CPU满载是100%,16个核心的是1600%。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
+- 最大内存:为执行的任务分配指定的内存大小,超过会触发OOM被Kill同时不会进行自动重试,单位MB,默认-1代表不限制。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
- 延时执行时间:任务延迟执行的时间,以分为单位。
- 超时警告:勾选超时警告、超时失败,当任务超过“超时时长”后,会发送告警邮件并且任务执行失败。
- 自定义模板:当默认提供的数据源不满足所需要求的时,可自定义 datax 节点的 json 配置文件内容。
diff --git a/docs/docs/zh/guide/task/jupyter.md b/docs/docs/zh/guide/task/jupyter.md
index bcde5122df..1372c843b3 100644
--- a/docs/docs/zh/guide/task/jupyter.md
+++ b/docs/docs/zh/guide/task/jupyter.md
@@ -26,7 +26,9 @@
- Worker分组:任务分配给worker组的机器机执行,选择Default,会随机选择一台worker机执行。
- 失败重试次数:任务失败重新提交的次数,支持下拉和手填。
- 失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。
-- 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
+- Cpu 配额: 为执行的任务分配指定的CPU时间配额,单位百分比,默认-1代表不限制,例如1个核心的CPU满载是100%,16个核心的是1600%。
+- 最大内存:为执行的任务分配指定的内存大小,超过会触发OOM被Kill同时不会进行自动重试,单位MB,默认-1代表不限制。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
+- 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
- 前置任务:选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
- Conda Env Name: Conda环境名称。
- Input Note Path: 输入的jupyter note模板路径。
diff --git a/docs/docs/zh/guide/task/python.md b/docs/docs/zh/guide/task/python.md
index d7e2b9561e..c389618cd3 100644
--- a/docs/docs/zh/guide/task/python.md
+++ b/docs/docs/zh/guide/task/python.md
@@ -20,6 +20,8 @@ Python 任务类型,用于创建 Python 类型的任务并执行一系列的 P
- 环境名称:配置运行脚本的环境。
- 失败重试次数:任务失败重新提交的次数,支持下拉和手填。
- 失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。
+- Cpu 配额: 为执行的任务分配指定的CPU时间配额,单位百分比,默认-1代表不限制,例如1个核心的CPU满载是100%,16个核心的是1600%。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
+- 最大内存:为执行的任务分配指定的内存大小,超过会触发OOM被Kill同时不会进行自动重试,单位MB,默认-1代表不限制。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
- 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
- 脚本:用户开发的PYTHON程序。
- 资源:是指脚本中需要调用的资源文件列表,资源中心-文件管理上传或创建的文件。
diff --git a/docs/docs/zh/guide/task/shell.md b/docs/docs/zh/guide/task/shell.md
index 79c7e4fab1..d50a3d04d2 100644
--- a/docs/docs/zh/guide/task/shell.md
+++ b/docs/docs/zh/guide/task/shell.md
@@ -19,6 +19,8 @@ Shell 任务类型,用于创建 Shell 类型的任务并执行一系列的 She
- 环境名称:配置运行脚本的环境。
- 失败重试次数:任务失败重新提交的次数,支持下拉和手填。
- 失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。
+- Cpu 配额: 为执行的任务分配指定的CPU时间配额,单位百分比,默认-1代表不限制,例如1个核心的CPU满载是100%,16个核心的是1600%。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
+- 最大内存:为执行的任务分配指定的内存大小,超过会触发OOM被Kill同时不会进行自动重试,单位MB,默认-1代表不限制。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
- 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
- 脚本:用户开发的 SHELL 程序。
- 资源:是指脚本中需要调用的资源文件列表,资源中心-文件管理上传或创建的文件。
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
index a9601a6fec..6999ac3baf 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
@@ -178,6 +178,16 @@ public class TaskNode {
*/
private int delayTime;
+ /**
+ * cpu quota
+ */
+ private Integer cpuQuota;
+
+ /**
+ * max memory
+ */
+ private Integer memoryMax;
+
public String getId() {
return id;
}
@@ -497,4 +507,20 @@ public class TaskNode {
public void setTaskGroupPriority(int taskGroupPriority) {
this.taskGroupPriority = taskGroupPriority;
}
+
+ public Integer getCpuQuota() {
+ return cpuQuota == null ? -1 : cpuQuota;
+ }
+
+ public void setCpuQuota(Integer cpuQuota) {
+ this.cpuQuota = cpuQuota;
+ }
+
+ public Integer getMemoryMax() {
+ return memoryMax == null ? -1 : memoryMax;
+ }
+
+ public void setMemoryMax(Integer memoryMax) {
+ this.memoryMax = memoryMax;
+ }
}
diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties
index 76c98d78ee..13c7d3a479 100644
--- a/dolphinscheduler-common/src/main/resources/common.properties
+++ b/dolphinscheduler-common/src/main/resources/common.properties
@@ -95,4 +95,7 @@ alert.rpc.port=50052
zeppelin.rest.url=http://localhost:8080
# set path of conda.sh
-conda.path=/opt/anaconda3/etc/profile.d/conda.sh
\ No newline at end of file
+conda.path=/opt/anaconda3/etc/profile.d/conda.sh
+
+# Task resource limit state
+task.resource.limit.state=false
\ No newline at end of file
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
index 0206719a35..c908764490 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
@@ -201,6 +201,16 @@ public class TaskDefinition {
*/
private int taskGroupPriority;
+ /**
+ * cpu quota
+ */
+ private Integer cpuQuota;
+
+ /**
+ * max memory
+ */
+ private Integer memoryMax;
+
public TaskDefinition() {
}
@@ -457,6 +467,22 @@ public class TaskDefinition {
this.environmentCode = environmentCode;
}
+ public Integer getCpuQuota() {
+ return cpuQuota == null ? -1 : cpuQuota;
+ }
+
+ public void setCpuQuota(Integer cpuQuota) {
+ this.cpuQuota = cpuQuota;
+ }
+
+ public Integer getMemoryMax() {
+ return memoryMax == null ? -1 : memoryMax;
+ }
+
+ public void setMemoryMax(Integer memoryMax) {
+ this.memoryMax = memoryMax;
+ }
+
@Override
public boolean equals(Object o) {
if (o == null) {
@@ -481,7 +507,9 @@ public class TaskDefinition {
|| ("".equals(that.resourceIds) && resourceIds == null))
&& environmentCode == that.environmentCode
&& taskGroupId == that.taskGroupId
- && taskGroupPriority == that.taskGroupPriority;
+ && taskGroupPriority == that.taskGroupPriority
+ && Objects.equals(cpuQuota, that.cpuQuota)
+ && Objects.equals(memoryMax, that.memoryMax);
}
@Override
@@ -513,6 +541,8 @@ public class TaskDefinition {
+ ", timeout=" + timeout
+ ", delayTime=" + delayTime
+ ", resourceIds='" + resourceIds + '\''
+ + ", cpuQuota=" + cpuQuota
+ + ", memoryMax=" + memoryMax
+ ", createTime=" + createTime
+ ", updateTime=" + updateTime
+ '}';
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
index 7301148e3e..f070fdb165 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
@@ -70,6 +70,8 @@ public class TaskDefinitionLog extends TaskDefinition {
this.setFailRetryTimes(taskDefinition.getFailRetryTimes());
this.setFlag(taskDefinition.getFlag());
this.setModifyBy(taskDefinition.getModifyBy());
+ this.setCpuQuota(taskDefinition.getCpuQuota());
+ this.setMemoryMax(taskDefinition.getMemoryMax());
}
public int getOperator() {
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 4ec1dfde2b..fc66bfd4cd 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -279,6 +279,16 @@ public class TaskInstance implements Serializable {
*/
private int taskGroupId;
+ /**
+ * cpu quota
+ */
+ private Integer cpuQuota;
+
+ /**
+ * max memory
+ */
+ private Integer memoryMax;
+
public void init(String host, Date startTime, String executePath) {
this.host = host;
this.startTime = startTime;
@@ -753,4 +763,20 @@ public class TaskInstance implements Serializable {
public void setTaskGroupPriority(int taskGroupPriority) {
this.taskGroupPriority = taskGroupPriority;
}
+
+ public Integer getCpuQuota() {
+ return cpuQuota == null ? -1 : cpuQuota;
+ }
+
+ public void setCpuQuota(Integer cpuQuota) {
+ this.cpuQuota = cpuQuota;
+ }
+
+ public Integer getMemoryMax() {
+ return memoryMax == null ? -1 : memoryMax;
+ }
+
+ public void setMemoryMax(Integer memoryMax) {
+ this.memoryMax = memoryMax;
+ }
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
index 14314d0fd9..16dc698d26 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
@@ -21,7 +21,7 @@
<sql id="baseSql">
id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority,
worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
- resource_ids, operator, operate_time, create_time, update_time,task_group_id,task_group_priority
+ resource_ids, operator, operate_time, create_time, update_time, task_group_id, task_group_priority, cpu_quota, memory_max
</sql>
<select id="queryMaxVersionForDefinition" resultType="java.lang.Integer">
select max(version)
@@ -51,7 +51,8 @@
<insert id="batchInsert">
insert into t_ds_task_definition_log (code, name, version, description, project_code, user_id,
task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval,
- timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, operator, operate_time, create_time, update_time,task_group_id,task_group_priority)
+ timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, operator, operate_time, create_time,
+ update_time, task_group_id, task_group_priority, cpu_quota, memory_max)
values
<foreach collection="taskDefinitionLogs" item="taskDefinitionLog" separator=",">
(#{taskDefinitionLog.code},#{taskDefinitionLog.name},#{taskDefinitionLog.version},#{taskDefinitionLog.description},
@@ -59,7 +60,8 @@
#{taskDefinitionLog.flag},#{taskDefinitionLog.taskPriority},#{taskDefinitionLog.workerGroup},#{taskDefinitionLog.environmentCode},
#{taskDefinitionLog.failRetryTimes},#{taskDefinitionLog.failRetryInterval},#{taskDefinitionLog.timeoutFlag},#{taskDefinitionLog.timeoutNotifyStrategy},
#{taskDefinitionLog.timeout},#{taskDefinitionLog.delayTime},#{taskDefinitionLog.resourceIds},#{taskDefinitionLog.operator},#{taskDefinitionLog.operateTime},
- #{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime}, #{taskDefinitionLog.taskGroupId},#{taskDefinitionLog.taskGroupPriority})
+ #{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime}, #{taskDefinitionLog.taskGroupId},#{taskDefinitionLog.taskGroupPriority},
+ #{taskDefinitionLog.cpuQuota},#{taskDefinitionLog.memoryMax})
</foreach>
</insert>
<delete id="deleteByCodeAndVersion">
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
index 5c889d1b07..c78dd5a449 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
@@ -21,7 +21,7 @@
<sql id="baseSql">
id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority,
worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
- resource_ids, create_time, update_time, task_group_id,task_group_priority
+ resource_ids, create_time, update_time, task_group_id,task_group_priority, cpu_quota, memory_max
</sql>
<select id="queryByName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index adc470c197..26fcd4b0f0 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -22,7 +22,7 @@
id, name, task_type, process_instance_id, task_code, task_definition_version, state, submit_time,
start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link,
flag, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id,
- first_submit_time, delay_time, task_params, var_pool, dry_run, task_group_id
+ first_submit_time, delay_time, task_params, var_pool, dry_run, task_group_id, cpu_quota, memory_max
</sql>
<sql id="baseSqlV2">
${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time,
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
index 5623b40d99..5f24d966da 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
@@ -486,6 +486,8 @@ CREATE TABLE t_ds_task_definition
delay_time int(11) DEFAULT '0',
task_group_id int(11) DEFAULT NULL,
task_group_priority tinyint(4) DEFAULT '0',
+ cpu_quota int(11) DEFAULT '-1' NOT NULL,
+ memory_max int(11) DEFAULT '-1' NOT NULL,
resource_ids text,
create_time datetime NOT NULL,
update_time datetime DEFAULT NULL,
@@ -521,6 +523,8 @@ CREATE TABLE t_ds_task_definition_log
operator int(11) DEFAULT NULL,
task_group_id int(11) DEFAULT NULL,
task_group_priority tinyint(4) DEFAULT '0',
+ cpu_quota int(11) DEFAULT '-1' NOT NULL,
+ memory_max int(11) DEFAULT '-1' NOT NULL,
operate_time datetime DEFAULT NULL,
create_time datetime NOT NULL,
update_time datetime DEFAULT NULL,
@@ -860,6 +864,8 @@ CREATE TABLE t_ds_task_instance
task_group_id int(11) DEFAULT NULL,
var_pool longtext,
dry_run int NULL DEFAULT 0,
+ cpu_quota int(11) DEFAULT '-1' NOT NULL,
+ memory_max int(11) DEFAULT '-1' NOT NULL,
PRIMARY KEY (id),
FOREIGN KEY (process_instance_id) REFERENCES t_ds_process_instance (id) ON DELETE CASCADE
);
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index 40323aa75a..2bbf3c2055 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -487,6 +487,8 @@ CREATE TABLE `t_ds_task_definition` (
`resource_ids` text COMMENT 'resource id, separated by comma',
`task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
`task_group_priority` tinyint(4) DEFAULT '0' COMMENT 'task group priority',
+ `cpu_quota` int(11) DEFAULT '-1' NOT NULL COMMENT 'cpuQuota(%): -1:Infinity',
+ `memory_max` int(11) DEFAULT '-1' NOT NULL COMMENT 'MemoryMax(MB): -1:Infinity',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
PRIMARY KEY (`id`,`code`)
@@ -521,6 +523,8 @@ CREATE TABLE `t_ds_task_definition_log` (
`task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
`task_group_priority` tinyint(4) DEFAULT 0 COMMENT 'task group priority',
`operate_time` datetime DEFAULT NULL COMMENT 'operate time',
+ `cpu_quota` int(11) DEFAULT '-1' NOT NULL COMMENT 'cpuQuota(%): -1:Infinity',
+ `memory_max` int(11) DEFAULT '-1' NOT NULL COMMENT 'MemoryMax(MB): -1:Infinity',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
PRIMARY KEY (`id`),
@@ -853,6 +857,8 @@ CREATE TABLE `t_ds_task_instance` (
`var_pool` longtext COMMENT 'var_pool',
`task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
`dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag: 0 normal, 1 dry run',
+ `cpu_quota` int(11) DEFAULT '-1' NOT NULL COMMENT 'cpuQuota(%): -1:Infinity',
+ `memory_max` int(11) DEFAULT '-1' NOT NULL COMMENT 'MemoryMax(MB): -1:Infinity',
PRIMARY KEY (`id`),
KEY `process_instance_id` (`process_instance_id`) USING BTREE,
KEY `idx_code_version` (`task_code`, `task_definition_version`) USING BTREE,
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
index 3efee9a53b..fabfda8bb3 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
@@ -404,6 +404,8 @@ CREATE TABLE t_ds_task_definition (
task_group_id int DEFAULT NULL,
task_group_priority int DEFAULT '0',
resource_ids text ,
+ cpu_quota int DEFAULT '-1' NOT NULL,
+ memory_max int DEFAULT '-1' NOT NULL,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
@@ -441,6 +443,8 @@ CREATE TABLE t_ds_task_definition_log (
task_group_id int DEFAULT NULL,
task_group_priority int DEFAULT '0',
operate_time timestamp DEFAULT NULL ,
+ cpu_quota int DEFAULT '-1' NOT NULL,
+ memory_max int DEFAULT '-1' NOT NULL,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
@@ -759,6 +763,8 @@ CREATE TABLE t_ds_task_instance (
task_group_id int DEFAULT NULL,
var_pool text ,
dry_run int DEFAULT '0' ,
+ cpu_quota int DEFAULT '-1' NOT NULL,
+ memory_max int DEFAULT '-1' NOT NULL,
PRIMARY KEY (id),
CONSTRAINT foreign_key_instance_id FOREIGN KEY(process_instance_id) REFERENCES t_ds_process_instance(id) ON DELETE CASCADE
) ;
diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.0.0_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.0.0_schema/mysql/dolphinscheduler_ddl.sql
index 73fa5f7cd4..6939da6626 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.0.0_schema/mysql/dolphinscheduler_ddl.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.0.0_schema/mysql/dolphinscheduler_ddl.sql
@@ -80,6 +80,15 @@ ALTER TABLE `t_ds_alert` ADD COLUMN `warning_type` tinyint(4) DEFAULT '2' COMMEN
ALTER TABLE `t_ds_alert` ADD INDEX `idx_status` (`alert_status`) USING BTREE;
+-- Add resource limit column
+ALTER TABLE `t_ds_task_definition` ADD COLUMN `cpu_quota` int(11) DEFAULT '-1' NOT NULL COMMENT 'cpuQuota(%): -1:Infinity' AFTER `task_group_priority`;
+ALTER TABLE `t_ds_task_definition` ADD COLUMN `memory_max` int(11) DEFAULT '-1' NOT NULL COMMENT 'MemoryMax(MB): -1:Infinity' AFTER `cpu_quota`;
+ALTER TABLE `t_ds_task_definition_log` ADD COLUMN `cpu_quota` int(11) DEFAULT '-1' NOT NULL COMMENT 'cpuQuota(%): -1:Infinity' AFTER `operate_time`;
+ALTER TABLE `t_ds_task_definition_log` ADD COLUMN `memory_max` int(11) DEFAULT '-1' NOT NULL COMMENT 'MemoryMax(MB): -1:Infinity' AFTER `cpu_quota`;
+ALTER TABLE `t_ds_task_instance` ADD COLUMN `cpu_quota` int(11) DEFAULT '-1' NOT NULL COMMENT 'cpuQuota(%): -1:Infinity' AFTER `dry_run`;
+ALTER TABLE `t_ds_task_instance` ADD COLUMN `memory_max` int(11) DEFAULT '-1' NOT NULL COMMENT 'MemoryMax(MB): -1:Infinity' AFTER `cpu_quota`;
+
+
--
-- Table structure for table `t_ds_dq_comparison_type`
--
diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.0.0_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.0.0_schema/postgresql/dolphinscheduler_ddl.sql
index 3de4b00d88..86471d71b1 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.0.0_schema/postgresql/dolphinscheduler_ddl.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.0.0_schema/postgresql/dolphinscheduler_ddl.sql
@@ -36,6 +36,15 @@ EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_resources ALTER COLUMN
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_alert ADD COLUMN IF NOT EXISTS sign varchar(40) NOT NULL DEFAULT '''' ';
EXECUTE 'comment on column ' || quote_ident(v_schema) ||'.t_ds_alert.sign is ''sign=sha1(content)''';
+-- Add resource limit column
+EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_task_definition ADD COLUMN IF NOT EXISTS cpu_quota int NOT NULL DEFAULT ''-1'' ';
+EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_task_definition ADD COLUMN IF NOT EXISTS memory_max int NOT NULL DEFAULT ''-1'' ';
+EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_task_definition_log ADD COLUMN IF NOT EXISTS cpu_quota int NOT NULL DEFAULT ''-1'' ';
+EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_task_definition_log ADD COLUMN IF NOT EXISTS memory_max int NOT NULL DEFAULT ''-1'' ';
+EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_task_instance ADD COLUMN IF NOT EXISTS cpu_quota int NOT NULL DEFAULT ''-1'' ';
+EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_task_instance ADD COLUMN IF NOT EXISTS memory_max int NOT NULL DEFAULT ''-1'' ';
+
+
return 'Success!';
exception when others then
---Raise EXCEPTION '(%)',SQLERRM;
diff --git a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/file-manage/common.properties b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/file-manage/common.properties
index b6f5126d9b..dcf839ff76 100644
--- a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/file-manage/common.properties
+++ b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/file-manage/common.properties
@@ -85,3 +85,6 @@ aws.access.key.id=accessKey123
aws.secret.access.key=secretKey123
aws.region=us-east-1
aws.endpoint=http://s3:9000
+
+# Task resource limit state
+task.resource.limit.state=false
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index 305af6b867..6c7154b45d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -63,6 +63,8 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setVarPool(taskInstance.getVarPool());
taskExecutionContext.setDryRun(taskInstance.getDryRun());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUBMITTED_SUCCESS);
+ taskExecutionContext.setCpuQuota(taskInstance.getCpuQuota());
+ taskExecutionContext.setMemoryMax(taskInstance.getMemoryMax());
return this;
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index cb3f4fc362..6b70b25149 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -1177,6 +1177,10 @@ public class WorkflowExecuteThread {
taskInstance.setTaskGroupId(taskNode.getTaskGroupId());
taskInstance.setTaskGroupPriority(taskNode.getTaskGroupPriority());
+ //set task cpu quota and max memory
+ taskInstance.setCpuQuota(taskNode.getCpuQuota());
+ taskInstance.setMemoryMax(taskNode.getMemoryMax());
+
// task instance priority
if (taskNode.getTaskInstancePriority() == null) {
taskInstance.setTaskInstancePriority(Priority.MEDIUM);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 7690672f77..f44f9d5b82 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -2767,6 +2767,8 @@ public class ProcessServiceImpl implements ProcessService {
taskNode.setPreTasks(JSONUtils.toJsonString(code.getValue().stream().map(taskDefinitionLogMap::get).map(TaskDefinition::getCode).collect(Collectors.toList())));
taskNode.setTaskGroupId(taskDefinitionLog.getTaskGroupId());
taskNode.setTaskGroupPriority(taskDefinitionLog.getTaskGroupPriority());
+ taskNode.setCpuQuota(taskDefinitionLog.getCpuQuota());
+ taskNode.setMemoryMax(taskDefinitionLog.getMemoryMax());
taskNodeList.add(taskNode);
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
index 000c49b474..adce339433 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
@@ -56,6 +56,11 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index e5e85a4332..17147c9ab6 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -21,8 +21,10 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import java.io.BufferedReader;
import java.io.File;
@@ -127,9 +129,13 @@ public abstract class AbstractCommandExecutor {
// if sudo.enable=true,setting up user to run commands
if (OSUtils.isSudoEnable()) {
- command.add("sudo");
- command.add("-u");
- command.add(taskRequest.getTenantCode());
+ if (OSUtils.isLinux() && PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE)) {
+ generateCgroupCommand(command);
+ } else {
+ command.add("sudo");
+ command.add("-u");
+ command.add(taskRequest.getTenantCode());
+ }
}
command.add(commandInterpreter());
command.addAll(Collections.emptyList());
@@ -142,6 +148,39 @@ public abstract class AbstractCommandExecutor {
printCommand(command);
}
+ /**
+ * generate systemd command.
+ * eg: sudo systemd-run -q --scope -p CPUQuota=100% -p MemoryMax=200M --uid=root
+ * @param command command
+ */
+ private void generateCgroupCommand(List<String> command) {
+ Integer cpuQuota = taskRequest.getCpuQuota();
+ Integer memoryMax = taskRequest.getMemoryMax();
+
+ command.add("sudo");
+ command.add("systemd-run");
+ command.add("-q");
+ command.add("--scope");
+
+ if (cpuQuota == -1) {
+ command.add("-p");
+ command.add("CPUQuota=");
+ } else {
+ command.add("-p");
+ command.add(String.format("CPUQuota=%s%%", taskRequest.getCpuQuota()));
+ }
+
+ if (memoryMax == -1) {
+ command.add("-p");
+ command.add(String.format("MemoryMax=%s", "infinity"));
+ } else {
+ command.add("-p");
+ command.add(String.format("MemoryMax=%sM", taskRequest.getMemoryMax()));
+ }
+
+ command.add(String.format("--uid=%s", taskRequest.getTenantCode()));
+ }
+
public TaskResponse run(String execCommand) throws IOException, InterruptedException {
TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId();
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
index 022dbf1327..4e1958c671 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
@@ -239,6 +239,16 @@ public class TaskExecutionContext {
private DataQualityTaskExecutionContext dataQualityTaskExecutionContext;
+ /**
+ * cpu quota
+ */
+ private Integer cpuQuota;
+
+ /**
+ * max memory
+ */
+ private Integer memoryMax;
+
public String getTaskLogName() {
return taskLogName;
}
@@ -567,6 +577,22 @@ public class TaskExecutionContext {
this.endTime = endTime;
}
+ public Integer getCpuQuota() {
+ return cpuQuota;
+ }
+
+ public void setCpuQuota(Integer cpuQuota) {
+ this.cpuQuota = cpuQuota;
+ }
+
+ public Integer getMemoryMax() {
+ return memoryMax;
+ }
+
+ public void setMemoryMax(Integer memoryMax) {
+ this.memoryMax = memoryMax;
+ }
+
public K8sTaskExecutionContext getK8sTaskExecutionContext() {
return k8sTaskExecutionContext;
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/AbstractCommandExecutorConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/AbstractCommandExecutorConstants.java
new file mode 100644
index 0000000000..19d0e3d3d9
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/AbstractCommandExecutorConstants.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.utils;
+
+public class AbstractCommandExecutorConstants {
+
+ private AbstractCommandExecutorConstants() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ public static final String TASK_RESOURCE_LIMIT_STATE = "task.resource.limit.state";
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/OSUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/OSUtils.java
index 0c8b78df75..166d26eec2 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/OSUtils.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/OSUtils.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.api.utils;
+import org.apache.commons.lang3.SystemUtils;
import org.apache.dolphinscheduler.plugin.task.api.ShellExecutor;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
@@ -68,6 +69,15 @@ public class OSUtils {
return getOSName().startsWith("Windows");
}
+ /**
+ * whether is linux
+ *
+ * @return true if linux
+ */
+ public static boolean isLinux() {
+ return SystemUtils.IS_OS_LINUX;
+ }
+
/**
* Execute the corresponding command of Linux or Windows
*
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts
index 993b00442a..9c3fe14d54 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -308,6 +308,8 @@ export default {
task_group_name: 'Task group name',
task_group_queue_priority: 'Priority',
number_of_failed_retries: 'Number of failed retries',
+ cpu_quota: 'CPU quota',
+ memory_max: 'Max memory',
times: 'Times',
failed_retry_interval: 'Failed retry interval',
minute: 'Minute',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index 897ae3818d..6b12c99927 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -308,6 +308,8 @@ export default {
task_group_name: '任务组名称',
task_group_queue_priority: '组内优先级',
number_of_failed_retries: '失败重试次数',
+ cpu_quota: 'CPU配额',
+ memory_max: '最大内存',
times: '次',
failed_retry_interval: '失败重试间隔',
minute: '分',
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
index bc3a2c61dd..89cd27ba83 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
@@ -23,6 +23,7 @@ export { useWorkerGroup } from './use-worker-group'
export { useEnvironmentName } from './use-environment-name'
export { useTaskGroup } from './use-task-group'
export { useFailed } from './use-failed'
+export { useResourceLimit } from './use-resource-limit'
export { useDelayTime } from './use-delay-time'
export { useTimeoutAlarm } from './use-timeout-alarm'
export { usePreTasks } from './use-pre-tasks'
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-resource-limit.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-resource-limit.ts
new file mode 100644
index 0000000000..b86a975eca
--- /dev/null
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-resource-limit.ts
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { useI18n } from 'vue-i18n'
+import type { IJsonItem } from '../types'
+
+export function useResourceLimit(): IJsonItem[] {
+ const { t } = useI18n()
+ return [
+ {
+ type: 'input-number',
+ field: 'cpuQuota',
+ name: t('project.node.cpu_quota'),
+ span: 12,
+ slots: {
+ suffix: () => t('%')
+ },
+ props: {min: -1}
+ },
+ {
+ type: 'input-number',
+ field: 'memoryMax',
+ name: t('project.node.memory_max'),
+ span: 12,
+ slots: {
+ suffix: () => t('MB')
+ },
+ props: {min: -1}
+ }
+ ]
+}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 7c87c22a60..70bd156cf2 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -409,7 +409,9 @@ export function formatParams(data: INodeData): {
timeout: data.timeoutFlag ? data.timeout : 0,
timeoutFlag: data.timeoutFlag ? 'OPEN' : 'CLOSE',
timeoutNotifyStrategy: data.timeoutFlag ? timeoutNotifyStrategy : '',
- workerGroup: data.workerGroup
+ workerGroup: data.workerGroup,
+ cpuQuota: data.cpuQuota || -1,
+ memoryMax: data.memoryMax || -1
}
} as {
processDefinitionCode: string
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datax.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datax.ts
index d37b78552b..5bbc366759 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datax.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datax.ts
@@ -42,6 +42,8 @@ export function useDataX({
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
+ cpuQuota: -1,
+ memoryMax: -1,
delayTime: 0,
timeout: 30,
customConfig: false,
@@ -77,6 +79,7 @@ export function useDataX({
Fields.useEnvironmentName(model, !model.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
+ ...Fields.useResourceLimit(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useDataX(model),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-jupyter.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-jupyter.ts
index 93ff593015..18f80f69fd 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-jupyter.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-jupyter.ts
@@ -41,6 +41,8 @@ export function useJupyter({
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
+ cpuQuota: -1,
+ memoryMax: -1,
delayTime: 0,
timeout: 30,
timeoutNotifyStrategy: ['WARN']
@@ -71,6 +73,7 @@ export function useJupyter({
Fields.useEnvironmentName(model, !model.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
+ ...Fields.useResourceLimit(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useJupyter(model),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-python.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-python.ts
index 9bca92bef6..698e84dd5a 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-python.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-python.ts
@@ -42,6 +42,8 @@ export function usePython({
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
+ cpuQuota: -1,
+ memoryMax: -1,
delayTime: 0,
timeout: 30,
rawScript: '',
@@ -73,6 +75,7 @@ export function usePython({
Fields.useEnvironmentName(model, !model.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
+ ...Fields.useResourceLimit(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useShell(model),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sea-tunnel.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sea-tunnel.ts
index f0b87c48fe..a994d217ef 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sea-tunnel.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sea-tunnel.ts
@@ -42,6 +42,8 @@ export function useSeaTunnel({
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
+ cpuQuota: -1,
+ memoryMax: -1,
delayTime: 0,
timeout: 30,
deployMode: 'client',
@@ -77,6 +79,7 @@ export function useSeaTunnel({
Fields.useEnvironmentName(model, !model.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
+ ...Fields.useResourceLimit(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useSeaTunnel(model),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-shell.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-shell.ts
index d0c38cd8e6..2caa420734 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-shell.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-shell.ts
@@ -43,6 +43,8 @@ export function useShell({
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
+ cpuQuota: -1,
+ memoryMax: -1,
delayTime: 0,
rawScript: ''
} as INodeData)
@@ -72,6 +74,7 @@ export function useShell({
Fields.useEnvironmentName(model, !data?.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
+ ...Fields.useResourceLimit(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useShell(model),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sqoop.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sqoop.ts
index 6e69ddc306..50220de931 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sqoop.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sqoop.ts
@@ -41,6 +41,8 @@ export function useSqoop({
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
+ cpuQuota: -1,
+ memoryMax: -1,
delayTime: 0,
timeout: 30,
isCustomTask: false,
@@ -91,6 +93,7 @@ export function useSqoop({
Fields.useEnvironmentName(model, !data?.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
+ ...Fields.useResourceLimit(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useSqoop(model),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index f44e65c95f..f400238b6d 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -365,6 +365,8 @@ interface INodeData
environmentCode?: number | null
failRetryInterval?: number
failRetryTimes?: number
+ cpuQuota?: number
+ memoryMax?: number
flag?: 'YES' | 'NO'
taskGroupId?: number
taskGroupPriority?: number