You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/05/18 09:23:50 UTC

[dolphinscheduler] branch dev updated: [Feature][plugin] Add k8s task in task plugin (#9425)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5bb1eb04fc [Feature][plugin] Add k8s task in task plugin (#9425)
5bb1eb04fc is described below

commit 5bb1eb04fce42dc3e6006d29d595318b1bf15914
Author: He Zhao <32...@users.noreply.github.com>
AuthorDate: Wed May 18 17:23:42 2022 +0800

    [Feature][plugin] Add k8s task in task plugin (#9425)
    
    * [Feature][plugin] Add k8s task in task plugin
    
    * [Feature][plugin] fix dos and code problems
    
    * [Feature][plugin] refactor some code based on sonar
    
    * [Feature][UI] front-end for k8s task plugin
    
    * [Feature][plugin] delete some front files
    
    * [Feature][plugin] update document
    
    * Update docs/docs/zh/guide/task/k8s.md
    
    * Update docs/docs/en/guide/task/k8s.md
    
    * Update docs/docs/en/guide/task/k8s.md
    
    * Update docs/docs/en/guide/task/k8s.md
    
    * [Feature][UI] front-end change from review comments
    
    * [Feature][UI] replace get namespace list api
    
    * [Feature][plugin] change file name
    
    * Add kubernetes to zh task list
    
    Co-authored-by: hezhao2 <he...@cisco.com>
    Co-authored-by: William Tong <we...@cisco.com>
    Co-authored-by: Jiajie Zhong <zh...@gmail.com>
---
 docs/configs/docsdev.js                            |   8 +
 docs/docs/en/guide/task/kubernetes.md              |  44 ++++
 docs/docs/zh/guide/task/kubernetes.md              |  45 ++++
 docs/img/tasks/demo/kubernetes-task-en.png         | Bin 0 -> 486552 bytes
 docs/img/tasks/icons/kubernetes.png                | Bin 0 -> 1383 bytes
 .../builder/TaskExecutionContextBuilder.java       |  14 +-
 .../master/runner/task/BaseTaskProcessor.java      |  65 +++--
 .../service/process/ProcessService.java            |   2 +
 .../service/process/ProcessServiceImpl.java        |  22 ++
 .../dolphinscheduler-task-all/pom.xml              |   6 +
 .../dolphinscheduler-task-api/pom.xml              |   4 +
 .../plugin/task/api/K8sTaskExecutionContext.java   |  43 ++++
 .../plugin/task/api/TaskConstants.java             |  23 ++
 .../plugin/task/api/TaskExecutionContext.java      |  14 +-
 .../plugin/task/api/k8s/AbstractK8sTask.java       |  74 ++++++
 .../task/api/k8s/AbstractK8sTaskExecutor.java      |  63 +++++
 .../plugin/task/api/k8s/K8sTaskMainParameters.java |  93 +++++++
 .../plugin/task/api/k8s/impl/K8sTaskExecutor.java  | 286 +++++++++++++++++++++
 .../plugin/task/api/utils/K8sUtils.java            | 119 +++++++++
 .../plugin/task/api/k8s/K8sTaskExecutorTest.java   | 101 ++++++++
 .../dolphinscheduler-task-k8s/pom.xml              |  55 ++++
 .../dolphinscheduler/plugin/task/k8s/K8sTask.java  |  87 +++++++
 .../plugin/task/k8s/K8sTaskChannel.java            |  49 ++++
 .../plugin/task/k8s/K8sTaskChannelFactory.java     |  44 ++++
 .../plugin/task/k8s/K8sTaskParameters.java         |  88 +++++++
 .../plugin/task/k8s/K8sParametersTest.java         |  58 +++++
 .../plugin/task/k8s/K8sTaskTest.java               |  85 ++++++
 dolphinscheduler-task-plugin/pom.xml               |   1 +
 .../public/images/task-icons/k8s.png               | Bin 0 -> 1392 bytes
 .../public/images/task-icons/k8s_hover.png         | Bin 0 -> 1383 bytes
 dolphinscheduler-ui/src/locales/modules/en_US.ts   |   8 +
 dolphinscheduler-ui/src/locales/modules/zh_CN.ts   |   8 +
 .../src/service/modules/k8s-namespace/index.ts     |   7 +
 .../projects/task/components/node/fields/index.ts  |   2 +
 .../task/components/node/fields/use-k8s.ts         |  58 +++++
 .../task/components/node/fields/use-namespace.ts   |  71 +++++
 .../projects/task/components/node/format-data.ts   |   7 +
 .../projects/task/components/node/tasks/index.ts   |   2 +
 .../projects/task/components/node/tasks/use-k8s.ts |  81 ++++++
 .../views/projects/task/components/node/types.ts   |   6 +
 .../src/views/projects/task/constants/task-type.ts |   5 +
 .../workflow/components/dag/dag.module.scss        |   6 +
 42 files changed, 1733 insertions(+), 21 deletions(-)

diff --git a/docs/configs/docsdev.js b/docs/configs/docsdev.js
index 9991cf08d8..6feba383d6 100644
--- a/docs/configs/docsdev.js
+++ b/docs/configs/docsdev.js
@@ -174,6 +174,10 @@ export default {
                                 title: 'Jupyter',
                                 link: '/en-us/docs/dev/user_doc/guide/task/jupyter.html',
                             },
+                            {
+                                title: 'Kubernetes',
+                                link: '/en-us/docs/dev/user_doc/guide/task/kubernetes.html',
+                            },
                         ],
                     },
                     {
@@ -525,6 +529,10 @@ export default {
                                 title: 'Jupyter',
                                 link: '/zh-cn/docs/dev/user_doc/guide/task/jupyter.html',
                             },
+                            {
+                                title: 'Kubernetes',
+                                link: '/zh-cn/docs/dev/user_doc/guide/task/kubernetes.html',
+                            },
                         ],
                     },
                     {
diff --git a/docs/docs/en/guide/task/kubernetes.md b/docs/docs/en/guide/task/kubernetes.md
new file mode 100644
index 0000000000..bc024b6100
--- /dev/null
+++ b/docs/docs/en/guide/task/kubernetes.md
@@ -0,0 +1,44 @@
+# K8S Node
+
+## Overview
+
+K8S task type used to execute a batch task. In this task, the worker submits the task by using a k8s client.
+
+## Create Task
+
+- Click `Project -> Management-Project -> Name-Workflow Definition`, and click the `Create Workflow` button to enter the DAG editing page.
+- Drag from the toolbar <img src="/img/tasks/icons/kubernetes.png" width="15"/> to the canvas.
+
+## Task Parameter
+
+- **Node name**: The node name in a workflow definition is unique.
+- **Run flag**: Identifies whether this node schedules normally, if it does not need to execute, select the `prohibition execution`.
+- **Descriptive information**: Describe the function of the node.
+- **Task priority**: When the number of worker threads is insufficient, execute in the order of priority from high to low, and tasks with the same priority will execute in a first-in first-out order.
+- **Worker grouping**: Assign tasks to the machines of the worker group to execute. If `Default` is selected, randomly select a worker machine for execution.
+- **Environment Name**: Configure the environment name in which to run the task.
+- **Times of failed retry attempts**: The number of times the task failed to resubmit.
+- **Failed retry interval**: The time interval (unit minute) for resubmitting the task after a failed task.
+- **Delayed execution time**: The time (unit minute) that a task delays in execution.
+- **Timeout alarm**: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm email will send and the task execution will fail.
+- **Namespace**::the namespace for running k8s task
+- **Min CPU**:min CPU requirement for running k8s task
+- **Min Memory**:min memory requirement for running k8s task
+- **Image**:the registry url for image 
+- **Custom parameter**: It is a local user-defined parameter for K8S task, these params will pass to container as environment variables.
+- **Predecessor task**: Selecting a predecessor task for the current task, will set the selected predecessor task as upstream of the current task.
+## Task Example
+
+### Configure the K8S Environment in DolphinScheduler
+
+If you are using the K8S task type in a production environment, the K8S cluster environment is required.
+
+### Configure K8S Nodes
+
+Configure the required content according to the parameter descriptions above.
+
+![K8S](/img/tasks/demo/kubernetes-task-en.png)
+
+## Notice
+
+Task name contains only lowercase alphanumeric characters or '-'
diff --git a/docs/docs/zh/guide/task/kubernetes.md b/docs/docs/zh/guide/task/kubernetes.md
new file mode 100644
index 0000000000..7802774b7c
--- /dev/null
+++ b/docs/docs/zh/guide/task/kubernetes.md
@@ -0,0 +1,45 @@
+# Kubernetes
+
+## 综述
+
+kubernetes任务类型,用于在kubernetes上执行一个短时和批处理的任务。worker最终会通过使用kubernetes client提交任务。
+
+## 创建任务
+
+- 点击项目管理-项目名称-工作流定义,点击"创建工作流"按钮,进入DAG编辑页面。
+- 工具栏中拖动 <img src="/img/tasks/icons/kubernetes.png" width="25"/> 到画板中,选择需要连接的数据源,即可完成创建。
+
+## 任务参数
+
+- 节点名称:设置任务的名称。一个工作流定义中的节点名称是唯一的。
+- 运行标志:标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。
+- 描述:描述该节点的功能。
+- 任务优先级:worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。
+- Worker 分组:任务分配给 worker 组的机器执行,选择 Default 会随机选择一台 worker 机执行。
+- 环境名称:配置运行任务的环境。
+- 失败重试次数:任务失败重新提交的次数。
+- 失败重试间隔:任务失败重新提交任务的时间间隔,以分为单位。
+- 延迟执行时间:任务延迟执行的时间,以分为单位。
+- 超时警告:勾选超时警告、超时失败,当任务超过“超时时长”后,会发送告警邮件并且任务执行失败。
+- 命名空间:选择kubernetes集群上存在的命名空间
+- 最小CPU:任务在kubernetes上运行所需的最小CPU
+- 最小内存:任务在kubernetes上运行所需的最小内存
+- 镜像:镜像地址
+- 自定义参数:kubernetes任务局部的用户自定义参数,自定义参数最终会通过环境变量形式存在于容器中,提供给kubernetes任务使用
+- 前置任务:在当前kubernetes任务之前需要执行的任务
+
+## 任务样例
+
+### 在 DolphinScheduler 中配置 kubernetes 集群环境
+
+若生产环境中要是使用到 kubernetes 任务类型,则需要预先配置好所需的kubernetes集群环境
+
+### 配置 kubernetes 任务节点
+
+根据上述参数说明,配置所需的内容即可。
+
+![kubernetes](/img/tasks/demo/kubernetes-task-en.png)
+
+## 注意事项
+
+任务名字限制在小写字母、数字和-这三种字符之中
\ No newline at end of file
diff --git a/docs/img/tasks/demo/kubernetes-task-en.png b/docs/img/tasks/demo/kubernetes-task-en.png
new file mode 100644
index 0000000000..30e732b627
Binary files /dev/null and b/docs/img/tasks/demo/kubernetes-task-en.png differ
diff --git a/docs/img/tasks/icons/kubernetes.png b/docs/img/tasks/icons/kubernetes.png
new file mode 100644
index 0000000000..ba79a67d22
Binary files /dev/null and b/docs/img/tasks/icons/kubernetes.png differ
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index 25a55fd94e..305af6b867 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@@ -70,7 +71,7 @@ public class TaskExecutionContextBuilder {
         if (taskDefinition.getTimeoutFlag() == TimeoutFlag.OPEN) {
             taskExecutionContext.setTaskTimeoutStrategy(taskDefinition.getTimeoutNotifyStrategy());
             if (taskDefinition.getTimeoutNotifyStrategy() == TaskTimeoutStrategy.FAILED
-                    || taskDefinition.getTimeoutNotifyStrategy() == TaskTimeoutStrategy.WARNFAILED) {
+                || taskDefinition.getTimeoutNotifyStrategy() == TaskTimeoutStrategy.WARNFAILED) {
                 taskExecutionContext.setTaskTimeout(Math.min(taskDefinition.getTimeout() * SEC_2_MINUTES_TIME_UNIT, Integer.MAX_VALUE));
             }
         }
@@ -117,12 +118,23 @@ public class TaskExecutionContextBuilder {
         taskExecutionContext.setResourceParametersHelper(parametersHelper);
         return this;
     }
+    /**
+     * build k8sTask related info
+     *
+     * @param k8sTaskExecutionContext sqoopTaskExecutionContext
+     * @return TaskExecutionContextBuilder
+     */
 
+    public TaskExecutionContextBuilder buildK8sTaskRelatedInfo(K8sTaskExecutionContext k8sTaskExecutionContext) {
+        taskExecutionContext.setK8sTaskExecutionContext(k8sTaskExecutionContext);
+        return this;
+    }
     /**
      * create
      *
      * @return taskExecutionContext
      */
+
     public TaskExecutionContext create() {
         return taskExecutionContext;
     }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 38fef8f44f..2ca0d6cb19 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -17,8 +17,24 @@
 
 package org.apache.dolphinscheduler.server.master.runner.task;
 
-import com.zaxxer.hikari.HikariDataSource;
-import org.apache.commons.collections.CollectionUtils;
+import static org.apache.dolphinscheduler.common.Constants.ADDRESS;
+import static org.apache.dolphinscheduler.common.Constants.DATABASE;
+import static org.apache.dolphinscheduler.common.Constants.JDBC_URL;
+import static org.apache.dolphinscheduler.common.Constants.OTHER;
+import static org.apache.dolphinscheduler.common.Constants.PASSWORD;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.USER;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DATA_QUALITY;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_K8S;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_NAME;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TABLE;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TYPE;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_CONNECTOR_TYPE;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_DATASOURCE_ID;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_CONNECTOR_TYPE;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_DATASOURCE_ID;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.HadoopUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -35,6 +51,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.UdfFunc;
 import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -52,6 +69,7 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.UdfFuncPa
 import org.apache.dolphinscheduler.plugin.task.api.utils.JdbcUrlParser;
 import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
 import org.apache.dolphinscheduler.plugin.task.dq.DataQualityParameters;
+import org.apache.dolphinscheduler.plugin.task.k8s.K8sTaskParameters;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -60,8 +78,8 @@ import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import org.apache.commons.collections.CollectionUtils;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -73,21 +91,10 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.dolphinscheduler.common.Constants.ADDRESS;
-import static org.apache.dolphinscheduler.common.Constants.DATABASE;
-import static org.apache.dolphinscheduler.common.Constants.JDBC_URL;
-import static org.apache.dolphinscheduler.common.Constants.OTHER;
-import static org.apache.dolphinscheduler.common.Constants.PASSWORD;
-import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
-import static org.apache.dolphinscheduler.common.Constants.USER;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DATA_QUALITY;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_NAME;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TABLE;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TYPE;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_CONNECTOR_TYPE;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_DATASOURCE_ID;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_CONNECTOR_TYPE;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_DATASOURCE_ID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.zaxxer.hikari.HikariDataSource;
 
 public abstract class BaseTaskProcessor implements ITaskProcessor {
 
@@ -278,6 +285,10 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
         if (TASK_TYPE_DATA_QUALITY.equalsIgnoreCase(taskInstance.getTaskType())) {
             setDataQualityTaskRelation(dataQualityTaskExecutionContext,taskInstance,tenant.getTenantCode());
         }
+        K8sTaskExecutionContext k8sTaskExecutionContext = new K8sTaskExecutionContext();
+        if (TASK_TYPE_K8S.equalsIgnoreCase(taskInstance.getTaskType())) {
+            setK8sTaskRelation(k8sTaskExecutionContext, taskInstance);
+        }
 
         return TaskExecutionContextBuilder.get()
                 .buildTaskInstanceRelatedInfo(taskInstance)
@@ -286,6 +297,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
                 .buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
                 .buildResourceParametersInfo(resources)
                 .buildDataQualityTaskExecutionContext(dataQualityTaskExecutionContext)
+                .buildK8sTaskRelatedInfo(k8sTaskExecutionContext)
                 .create();
     }
 
@@ -579,4 +591,19 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
 
         return resourcesMap;
     }
+
+    /**
+     * set k8s task relation
+     * @param k8sTaskExecutionContext k8sTaskExecutionContext
+     * @param taskInstance taskInstance
+     */
+    private void setK8sTaskRelation(K8sTaskExecutionContext k8sTaskExecutionContext, TaskInstance taskInstance) {
+        K8sTaskParameters k8sTaskParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), K8sTaskParameters.class);
+        Map<String,String> namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace());
+        String clusterName = namespace.get(CLUSTER);
+        String configYaml = processService.findConfigYamlByName(clusterName);
+        if (configYaml != null) {
+            k8sTaskExecutionContext.setConfigYaml(configYaml);
+        }
+    }
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index bf48ab9cea..9ff8689fd3 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -296,4 +296,6 @@ public interface ProcessService {
                               org.apache.dolphinscheduler.remote.command.CommandType taskType);
 
     ProcessInstance loadNextProcess4Serial(long code, int state, int id);
+
+    public String findConfigYamlByName(String clusterName) ;
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 5136891280..20058539a1 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -62,6 +62,7 @@ import org.apache.dolphinscheduler.dao.entity.DqRuleInputEntry;
 import org.apache.dolphinscheduler.dao.entity.DqTaskStatisticsValue;
 import org.apache.dolphinscheduler.dao.entity.Environment;
 import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
+import org.apache.dolphinscheduler.dao.entity.K8s;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -90,6 +91,7 @@ import org.apache.dolphinscheduler.dao.mapper.DqRuleMapper;
 import org.apache.dolphinscheduler.dao.mapper.DqTaskStatisticsValueMapper;
 import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper;
 import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.K8sMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
@@ -269,6 +271,9 @@ public class ProcessServiceImpl implements ProcessService {
     @Autowired
     private TaskPluginManager taskPluginManager;
 
+    @Autowired
+    private K8sMapper k8sMapper;
+
     /**
      * handle Command (construct ProcessInstance from Command) , wrapped in transaction
      *
@@ -3042,4 +3047,21 @@ public class ProcessServiceImpl implements ProcessService {
             throw new ServiceException("delete command fail, id:" + commandId);
         }
     }
+    /**
+     * find k8s config yaml by clusterName
+     *
+     * @param clusterName clusterName
+     * @return datasource
+     */
+
+    @Override
+    public String findConfigYamlByName(String clusterName) {
+        if (StringUtils.isEmpty(clusterName)) {
+            return null;
+        }
+        QueryWrapper<K8s> nodeWrapper = new QueryWrapper<>();
+        nodeWrapper.eq("k8s_name", clusterName);
+        K8s k8s = k8sMapper.selectOne(nodeWrapper);
+        return k8s.getK8sConfig();
+    }
 }
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
index 5185b3ce28..2c5c7fc6df 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
@@ -153,6 +153,12 @@
             <artifactId>dolphinscheduler-task-blocking</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-task-k8s</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
index 64b2aa05d5..b271dda0f0 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
@@ -293,5 +293,9 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>io.fabric8</groupId>
+            <artifactId>kubernetes-client</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
new file mode 100644
index 0000000000..4916d8e57f
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api;
+
+import java.io.Serializable;
+
+/**
+ *  k8s Task ExecutionContext
+ */
+
+public class K8sTaskExecutionContext implements Serializable {
+    private String configYaml;
+
+    public String getConfigYaml() {
+        return configYaml;
+    }
+
+    public void setConfigYaml(String configYaml) {
+        this.configYaml = configYaml;
+    }
+
+    @Override
+    public String toString() {
+        return "K8sTaskExecutionContext{"
+            + "configYaml='" + configYaml + '\''
+            + '}';
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
index 12b6c1c19e..c1a1ddadc1 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
@@ -117,6 +117,10 @@ public class TaskConstants {
      * exit code success
      */
     public static final int EXIT_CODE_SUCCESS = 0;
+    /**
+     * running code
+     */
+    public static final int RUNNING_CODE = 1;
 
     public static final String SH = "sh";
 
@@ -387,6 +391,8 @@ public class TaskConstants {
 
     public static final String TASK_TYPE_DATA_QUALITY = "DATA_QUALITY";
 
+    public static final String TASK_TYPE_K8S = "K8S";
+
     public static final String TASK_TYPE_BLOCKING = "BLOCKING";
 
     public static final List<String> COMPLEX_TASK_TYPES = Arrays.asList(new String[]{TASK_TYPE_CONDITIONS, TASK_TYPE_SWITCH, TASK_TYPE_SUB_PROCESS, TASK_TYPE_DEPENDENT});
@@ -398,6 +404,23 @@ public class TaskConstants {
     public static final String AWS_SECRET_ACCESS_KEY= "aws.secret.access.key";
     public static final String AWS_REGION = "aws.region";
 
+    /**
+     * use for k8s task
+     */
+    public static final String API_VERSION = "batch/v1";
+    public static final String IMAGE_PULL_POLICY = "Always";
+    public static final String RESTART_POLICY = "Never";
+    public static final String MEMORY = "memory";
+    public static final String CPU = "cpu";
+    public static final String LAYER_LABEL = "k8s.cn/layer";
+    public static final String LAYER_LABEL_VALUE = "batch";
+    public static final String NAME_LABEL = "k8s.cn/name";
+    public static final String  TASK_INSTANCE_ID = "taskInstanceId";
+    public static final String MI = "Mi";
+    public static final int JOB_TTL_SECONDS = 300;
+    public static final int LOG_LINES = 500;
+    public static final String NAMESPACE_NAME = "name";
+    public static final String CLUSTER = "cluster";
     /**
      * zeppelin config
      */
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
index 581c73c8f1..022dbf1327 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
@@ -216,7 +216,10 @@ public class TaskExecutionContext {
      * sql TaskExecutionContext
      */
     private SQLTaskExecutionContext sqlTaskExecutionContext;
-
+    /**
+     * k8s TaskExecutionContext
+     */
+    private K8sTaskExecutionContext k8sTaskExecutionContext;
     /**
      * resources full name and tenant code
      */
@@ -564,6 +567,14 @@ public class TaskExecutionContext {
         this.endTime = endTime;
     }
 
+    public K8sTaskExecutionContext getK8sTaskExecutionContext() {
+        return k8sTaskExecutionContext;
+    }
+
+    public void setK8sTaskExecutionContext(K8sTaskExecutionContext k8sTaskExecutionContext) {
+        this.k8sTaskExecutionContext = k8sTaskExecutionContext;
+    }
+
     @Override
     public String toString() {
         return "TaskExecutionContext{"
@@ -601,6 +612,7 @@ public class TaskExecutionContext {
                 + ", delayTime=" + delayTime
                 + ", resources=" + resources
                 + ", sqlTaskExecutionContext=" + sqlTaskExecutionContext
+                + ", k8sTaskExecutionContext=" + k8sTaskExecutionContext
                 + ", dataQualityTaskExecutionContext=" + dataQualityTaskExecutionContext
                 + '}';
     }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
new file mode 100644
index 0000000000..8d7f7eac7f
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.k8s;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+
+public abstract class AbstractK8sTask extends AbstractTaskExecutor {
+    /**
+     * process task
+     */
+    private AbstractK8sTaskExecutor abstractK8sTaskExecutor;
+    /**
+     * Abstract k8s Task
+     *
+     * @param taskRequest taskRequest
+     */
+    protected AbstractK8sTask(TaskExecutionContext taskRequest) {
+        super(taskRequest);
+        this.abstractK8sTaskExecutor = new K8sTaskExecutor(logger,taskRequest);
+    }
+
+    @Override
+    public void handle() throws Exception {
+        try {
+            TaskResponse response = abstractK8sTaskExecutor.run(buildCommand());
+            setExitStatusCode(response.getExitStatusCode());
+            setAppIds(response.getAppIds());
+        } catch (Exception e) {
+            exitStatusCode = -1;
+            throw new TaskException("k8s process failure",e);
+        }
+    }
+
+    /**
+     * cancel application
+     *
+     * @param status status
+     * @throws Exception exception
+     */
+    @Override
+    public void cancelApplication(boolean status) throws Exception {
+        cancel = true;
+        // cancel process
+        abstractK8sTaskExecutor.cancelApplication(buildCommand());
+    }
+
+    /**
+     * create command
+     *
+     * @return String
+     * @throws Exception exception
+     */
+    protected abstract String buildCommand();
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
new file mode 100644
index 0000000000..1d619b26b2
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.k8s;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils;
+
+import org.slf4j.Logger;
+
+public abstract class AbstractK8sTaskExecutor {
+    protected Logger logger;
+    protected TaskExecutionContext taskRequest;
+    protected K8sUtils k8sUtils;
+    protected StringBuilder logStringBuffer;
+
+    protected AbstractK8sTaskExecutor(Logger logger, TaskExecutionContext taskRequest) {
+        this.logger = logger;
+        this.taskRequest = taskRequest;
+        this.k8sUtils = new K8sUtils();
+        this.logStringBuffer = new StringBuilder();
+    }
+
+    public abstract TaskResponse run(String k8sParameterStr) throws Exception;
+
+    public abstract void cancelApplication(String k8sParameterStr);
+
+    public void waitTimeout(Boolean timeout) throws TaskException {
+        if (Boolean.TRUE.equals(timeout)) {
+            throw new TaskException("K8sTask is timeout");
+        }
+    }
+
+    public void flushLog(TaskResponse taskResponse) {
+        if (logStringBuffer.length() != 0 && taskResponse.getExitStatusCode() == EXIT_CODE_FAILURE) {
+            logger.error(logStringBuffer.toString());
+        } else if (logStringBuffer.length() != 0) {
+            logger.info(logStringBuffer.toString());
+        }
+    }
+
+    public abstract void submitJob2k8s(String k8sParameterStr);
+
+    public abstract void stopJobOnK8s(String k8sParameterStr);
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
new file mode 100644
index 0000000000..86caf43934
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.k8s;
+
+import java.util.Map;
+
+/**
+ * k8s task parameters
+ */
+public class K8sTaskMainParameters {
+
+    private String image;
+    private String namespaceName;
+    private String clusterName;
+    private double minCpuCores;
+    private double minMemorySpace;
+    private Map<String, String> paramsMap;
+
+    public String getImage() {
+        return image;
+    }
+
+    public void setImage(String image) {
+        this.image = image;
+    }
+
+    public double getMinCpuCores() {
+        return minCpuCores;
+    }
+
+    public void setMinCpuCores(double minCpuCores) {
+        this.minCpuCores = minCpuCores;
+    }
+
+    public double getMinMemorySpace() {
+        return minMemorySpace;
+    }
+
+    public void setMinMemorySpace(double minMemorySpace) {
+        this.minMemorySpace = minMemorySpace;
+    }
+
+    public String getNamespaceName() {
+        return namespaceName;
+    }
+
+    public void setNamespaceName(String namespaceName) {
+        this.namespaceName = namespaceName;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public Map<String, String> getParamsMap() {
+        return paramsMap;
+    }
+
+    public void setParamsMap(Map<String, String> paramsMap) {
+        this.paramsMap = paramsMap;
+    }
+
+    @Override
+    public String toString() {
+        return "K8sTaskMainParameters{"
+             + "image='" + image + '\''
+             + ", namespaceName='" + namespaceName + '\''
+             + ", clusterName='" + clusterName + '\''
+             + ", minCpuCores=" + minCpuCores
+             + ", minMemorySpace=" + minMemorySpace
+             + ", paramsMap=" + paramsMap
+             + '}';
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
new file mode 100644
index 0000000000..79e1c48af3
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.k8s.impl;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.API_VERSION;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CPU;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.IMAGE_PULL_POLICY;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JOB_TTL_SECONDS;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LAYER_LABEL;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LAYER_LABEL_VALUE;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.MEMORY;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.MI;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAME_LABEL;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RESTART_POLICY;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_INSTANCE_ID;
+
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters;
+import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.Quantity;
+import io.fabric8.kubernetes.api.model.ResourceRequirements;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.WatcherException;
+
+/**
+ * K8sTaskExecutor used to submit k8s task to K8S
+ */
+public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
+    private Job job;
+    public K8sTaskExecutor(Logger logger, TaskExecutionContext taskRequest) {
+        super(logger,taskRequest);
+    }
+
+    public Job buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) {
+        String taskInstanceId = String.valueOf(taskRequest.getTaskInstanceId());
+        String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT);
+        String image = k8STaskMainParameters.getImage();
+        String namespaceName = k8STaskMainParameters.getNamespaceName();
+        Map<String, String> otherParams = k8STaskMainParameters.getParamsMap();
+        Double podMem = k8STaskMainParameters.getMinMemorySpace();
+        Double podCpu = k8STaskMainParameters.getMinCpuCores();
+        Double limitPodMem = podMem * 2;
+        Double limitPodCpu = podCpu * 2;
+        int retryNum = 0;
+        String k8sJobName = String.format("%s-%s", taskName, taskInstanceId);
+        Map<String, Quantity> reqRes = new HashMap<>();
+        reqRes.put(MEMORY, new Quantity(String.format("%s%s", podMem, MI)));
+        reqRes.put(CPU, new Quantity(String.valueOf(podCpu)));
+        Map<String, Quantity> limitRes = new HashMap<>();
+        limitRes.put(MEMORY, new Quantity(String.format("%s%s", limitPodMem, MI)));
+        limitRes.put(CPU, new Quantity(String.valueOf(limitPodCpu)));
+        Map<String, String> labelMap = new HashMap<>();
+        labelMap.put(LAYER_LABEL, LAYER_LABEL_VALUE);
+        labelMap.put(NAME_LABEL, k8sJobName);
+        EnvVar taskInstanceIdVar = new EnvVar(TASK_INSTANCE_ID, taskInstanceId, null);
+        List<EnvVar> envVars = new ArrayList<>();
+        envVars.add(taskInstanceIdVar);
+        if (MapUtils.isNotEmpty(otherParams)) {
+            for (Map.Entry<String,String> entry : otherParams.entrySet()) {
+                String param = entry.getKey();
+                String paramValue = entry.getValue();
+                EnvVar envVar = new EnvVar(param, paramValue, null);
+                envVars.add(envVar);
+            }
+        }
+        return new JobBuilder()
+            .withApiVersion(API_VERSION)
+            .withNewMetadata()
+            .withName(k8sJobName)
+            .withLabels(labelMap)
+            .withNamespace(namespaceName)
+            .endMetadata()
+            .withNewSpec()
+            .withTtlSecondsAfterFinished(JOB_TTL_SECONDS)
+            .withNewTemplate()
+            .withNewSpec()
+            .addNewContainer()
+            .withName(k8sJobName)
+            .withImage(image)
+            .withImagePullPolicy(IMAGE_PULL_POLICY)
+            .withResources(new ResourceRequirements(limitRes, reqRes))
+            .withEnv(envVars)
+            .endContainer()
+            .withRestartPolicy(RESTART_POLICY)
+            .endSpec()
+            .endTemplate()
+            .withBackoffLimit(retryNum)
+            .endSpec()
+            .build();
+    }
+
+    public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse taskResponse, K8sTaskMainParameters k8STaskMainParameters) {
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        Watcher<Job> watcher = new Watcher<Job>() {
+            @Override
+            public void eventReceived(Action action, Job job) {
+                if (action != Action.ADDED) {
+                    int jobStatus = getK8sJobStatus(job);
+                    setTaskStatus(jobStatus,taskInstanceId, taskResponse, k8STaskMainParameters);
+                    countDownLatch.countDown();
+                    }
+                }
+
+            @Override
+            public void onClose(WatcherException e) {
+                logStringBuffer.append(String.format("[K8sJobExecutor-%s] fail in k8s: %s",job.getMetadata().getName(),e.getMessage()));
+                taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
+                countDownLatch.countDown();
+            }
+
+            @Override
+            public void onClose() {
+                logger.warn("Watch gracefully closed");
+            }
+        };
+        Watch watch = null;
+        try {
+            watch = k8sUtils.createBatchJobWatcher(job.getMetadata().getName(), watcher);
+            boolean timeoutFlag = taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED
+                || taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED;
+            if (timeoutFlag) {
+                Boolean timeout = !(countDownLatch.await(taskRequest.getTaskTimeout(), TimeUnit.SECONDS));
+                waitTimeout(timeout);
+            } else {
+                countDownLatch.await();
+            }
+            flushLog(taskResponse);
+        } catch (InterruptedException e) {
+            logger.error("job failed in k8s: {}",e.getMessage(), e);
+            Thread.currentThread().interrupt();
+            taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
+        } catch (Exception e) {
+            logger.error("job failed in k8s: {}",e.getMessage(), e);
+            taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
+        } finally {
+            if (watch != null) {
+                watch.close();
+            }
+        }
+    }
+
+    @Override
+    public TaskResponse run(String k8sParameterStr) throws Exception {
+        TaskResponse result = new TaskResponse();
+        int taskInstanceId = taskRequest.getTaskInstanceId();
+        K8sTaskMainParameters k8STaskMainParameters = JSONUtils.parseObject(k8sParameterStr, K8sTaskMainParameters.class);
+        try {
+            if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
+                result.setExitStatusCode(EXIT_CODE_KILL);
+                return result;
+            }
+            if (StringUtils.isEmpty(k8sParameterStr)) {
+                TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
+                return result;
+            }
+            K8sTaskExecutionContext k8sTaskExecutionContext = taskRequest.getK8sTaskExecutionContext();
+            String  configYaml = k8sTaskExecutionContext.getConfigYaml();
+            k8sUtils.buildClient(configYaml);
+            submitJob2k8s(k8sParameterStr);
+            registerBatchJobWatcher(job, Integer.toString(taskInstanceId), result, k8STaskMainParameters);
+        } catch (Exception e) {
+            cancelApplication(k8sParameterStr);
+            result.setExitStatusCode(EXIT_CODE_FAILURE);
+            throw e;
+        }
+        return result;
+    }
+
+    @Override
+    public void cancelApplication(String k8sParameterStr) {
+        if (job != null) {
+            stopJobOnK8s(k8sParameterStr);
+        }
+    }
+
+    @Override
+    public void submitJob2k8s(String k8sParameterStr) {
+        int taskInstanceId = taskRequest.getTaskInstanceId();
+        String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT);
+        K8sTaskMainParameters k8STaskMainParameters = JSONUtils.parseObject(k8sParameterStr, K8sTaskMainParameters.class);
+        try {
+            logger.info("[K8sJobExecutor-{}-{}] start to submit job", taskName, taskInstanceId);
+            job = buildK8sJob(k8STaskMainParameters);
+            stopJobOnK8s(k8sParameterStr);
+            String namespaceName = k8STaskMainParameters.getNamespaceName();
+            k8sUtils.createJob(namespaceName, job);
+            logger.info("[K8sJobExecutor-{}-{}]  submitted job successfully", taskName, taskInstanceId);
+        } catch (Exception e) {
+            logger.error("[K8sJobExecutor-{}-{}]  fail to submit job", taskName, taskInstanceId);
+            throw new TaskException("K8sJobExecutor fail to submit job", e);
+        }
+    }
+
+    @Override
+    public void stopJobOnK8s(String k8sParameterStr) {
+        K8sTaskMainParameters k8STaskMainParameters = JSONUtils.parseObject(k8sParameterStr, K8sTaskMainParameters.class);
+        String namespaceName = k8STaskMainParameters.getNamespaceName();
+        String jobName = job.getMetadata().getName();
+        try {
+            if (Boolean.TRUE.equals(k8sUtils.jobExist(jobName, namespaceName))) {
+                k8sUtils.deleteJob(jobName, namespaceName);
+            }
+        } catch (Exception e) {
+            logger.error("[K8sJobExecutor-{}]  fail to stop job", jobName);
+            throw new TaskException("K8sJobExecutor fail to stop job", e);
+        }
+    }
+
+    public int getK8sJobStatus(Job job) {
+        JobStatus jobStatus = job.getStatus();
+        if (jobStatus.getSucceeded() != null && jobStatus.getSucceeded() == 1) {
+            return EXIT_CODE_SUCCESS;
+        } else if (jobStatus.getFailed() != null && jobStatus.getFailed() == 1) {
+            return EXIT_CODE_FAILURE;
+        } else {
+            return TaskConstants.RUNNING_CODE;
+        }
+    }
+
+    public void setTaskStatus(int jobStatus,String taskInstanceId, TaskResponse taskResponse, K8sTaskMainParameters k8STaskMainParameters) {
+        if (jobStatus == EXIT_CODE_SUCCESS || jobStatus == EXIT_CODE_FAILURE) {
+            if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(Integer.valueOf(taskInstanceId))) {
+                logStringBuffer.append(String.format("[K8sJobExecutor-%s] killed", job.getMetadata().getName()));
+                taskResponse.setExitStatusCode(EXIT_CODE_KILL);
+            } else if (jobStatus == EXIT_CODE_SUCCESS) {
+                logStringBuffer.append(String.format("[K8sJobExecutor-%s] succeed in k8s", job.getMetadata().getName()));
+                taskResponse.setExitStatusCode(EXIT_CODE_SUCCESS);
+            } else {
+                String errorMessage = k8sUtils.getPodLog(job.getMetadata().getName(), k8STaskMainParameters.getNamespaceName());
+                logStringBuffer.append(String.format("[K8sJobExecutor-%s] fail in k8s: %s", job.getMetadata().getName(), errorMessage));
+                taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
+            }
+        }
+    }
+
+    public Job getJob() {
+        return job;
+    }
+
+    public void setJob(Job job) {
+        this.job = job;
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java
new file mode 100644
index 0000000000..f2b104875b
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.utils;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LOG_LINES;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+
+import java.util.List;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobList;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+
+public class K8sUtils {
+    private static final Logger log = LoggerFactory.getLogger(K8sUtils.class);
+    private KubernetesClient client;
+
+    public void createJob(String namespace, Job job) {
+        try {
+            client.batch().v1()
+                .jobs()
+                .inNamespace(namespace)
+                .create(job);
+        } catch (Exception e) {
+            throw new TaskException("fail to create job",e);
+        }
+    }
+
+    public void deleteJob(String jobName, String namespace) {
+        try {
+            client.batch().v1()
+                .jobs()
+                .inNamespace(namespace)
+                .withName(jobName)
+                .delete();
+        } catch (Exception e) {
+            throw new TaskException("fail to delete job",e);
+        }
+    }
+
+    public Boolean jobExist(String jobName, String namespace) {
+        Optional<Job> result;
+        try {
+            JobList jobList = client.batch().v1().jobs().inNamespace(namespace).list();
+            List<Job> jobs = jobList.getItems();
+            result = jobs.stream()
+                .filter(job -> job.getMetadata().getName().equals(jobName))
+                .findFirst();
+            return result.isPresent();
+        } catch (Exception e) {
+            throw new TaskException("fail to check job: ", e);
+        }
+    }
+
+    public Watch createBatchJobWatcher(String jobName, Watcher<Job> watcher) {
+        try {
+            return client.batch().v1()
+                .jobs().withName(jobName).watch(watcher);
+        } catch (Exception e) {
+            throw new TaskException("fail to register batch job watcher",e);
+        }
+    }
+
+    public String getPodLog(String jobName, String namespace) {
+        try {
+            List<Pod> podList = client.pods().inNamespace(namespace).list().getItems();
+            String podName = null;
+            for (Pod pod : podList) {
+                podName = pod.getMetadata().getName();
+                if (jobName.equals(podName.substring(0, pod.getMetadata().getName().lastIndexOf("-")))) {
+                    break;
+                }
+            }
+            return client.pods().inNamespace(namespace)
+                .withName(podName)
+                .tailingLines(LOG_LINES)
+                .getLog(Boolean.TRUE);
+        } catch (Exception e) {
+            log.error("fail to getPodLog", e);
+            log.error("response bodies : {}", e.getMessage());
+        }
+        return null;
+    }
+
+    public void buildClient(String configYaml) {
+        try {
+            Config config = Config.fromKubeconfig(configYaml);
+            client = new DefaultKubernetesClient(config);
+        } catch (Exception e) {
+            throw new TaskException("fail to build k8s ApiClient",e);
+        }
+    }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
new file mode 100644
index 0000000000..db30248487
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.k8s;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAMESPACE_NAME;
+import static org.hamcrest.Matchers.is;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class K8sTaskExecutorTest {
+    private K8sTaskExecutor k8sTaskExecutor = null;
+    private K8sTaskMainParameters k8sTaskMainParameters = null;
+    private final String image = "ds-dev";
+    private final String namespace = "{\"name\":\"default\",\"cluster\":\"lab\"}";
+    private final double minCpuCores = 2;
+    private final double minMemorySpace = 10;
+    private final int taskInstanceId = 1000;
+    private final String taskName = "k8s_task_test";
+    private Job job;
+    @Before
+    public void before() {
+        TaskExecutionContext taskRequest = new TaskExecutionContext();
+        taskRequest.setTaskInstanceId(taskInstanceId);
+        taskRequest.setTaskName(taskName);
+        Map<String,String> namespace = JSONUtils.toMap(this.namespace);
+        String namespaceName = namespace.get(NAMESPACE_NAME);
+        String clusterName = namespace.get(CLUSTER);
+        k8sTaskExecutor = new K8sTaskExecutor(null,taskRequest);
+        k8sTaskMainParameters = new K8sTaskMainParameters();
+        k8sTaskMainParameters.setImage(image);
+        k8sTaskMainParameters.setNamespaceName(namespaceName);
+        k8sTaskMainParameters.setClusterName(clusterName);
+        k8sTaskMainParameters.setMinCpuCores(minCpuCores);
+        k8sTaskMainParameters.setMinMemorySpace(minMemorySpace);
+        job = k8sTaskExecutor.buildK8sJob(k8sTaskMainParameters);
+    }
+    @Test
+    public void testBuildK8sJobNormal() {
+        String jobStr = "Job(apiVersion=batch/v1, kind=Job, metadata=ObjectMeta(annotations=null, clusterName=null, creationTimestamp=null, deletionGracePeriodSeconds=null, deletionTimestamp=null, finalizers=[], generateName=null, generation=null, labels={k8s.cn/layer=batch, k8s.cn/name=k8s_task_test-1000}, managedFields=[], name=k8s_task_test-1000, namespace=default, ownerReferences=[], resourceVersion=null, selfLink=null, uid=null, additionalProperties={}), spec=JobSpec(activeDeadlineS [...]
+        Assert.assertEquals(jobStr, job.toString());
+    }
+    @Test
+    public void testGetJobNormal() {
+        k8sTaskExecutor.setJob(job);
+        String jobStr = "Job(apiVersion=batch/v1, kind=Job, metadata=ObjectMeta(annotations=null, clusterName=null, creationTimestamp=null, deletionGracePeriodSeconds=null, deletionTimestamp=null, finalizers=[], generateName=null, generation=null, labels={k8s.cn/layer=batch, k8s.cn/name=k8s_task_test-1000}, managedFields=[], name=k8s_task_test-1000, namespace=default, ownerReferences=[], resourceVersion=null, selfLink=null, uid=null, additionalProperties={}), spec=JobSpec(activeDeadlineS [...]
+        Assert.assertEquals(jobStr,k8sTaskExecutor.getJob().toString());
+    }
+    @Test
+    public void testGetK8sJobStatusNormal() {
+        JobStatus jobStatus = new JobStatus();
+        jobStatus.setSucceeded(1);
+        job.setStatus(jobStatus);
+        Assert.assertEquals(0, Integer.compare(0,k8sTaskExecutor.getK8sJobStatus(job)));
+    }
+    @Test
+    public void testSetTaskStatusNormal() {
+        int jobStatus = 0;
+        TaskResponse taskResponse = new TaskResponse();
+        K8sTaskMainParameters k8STaskMainParameters = new K8sTaskMainParameters();
+        k8sTaskExecutor.setJob(job);
+        k8sTaskExecutor.setTaskStatus(jobStatus,String.valueOf(taskInstanceId),taskResponse,k8STaskMainParameters);
+        Assert.assertEquals(0, Integer.compare(EXIT_CODE_KILL,taskResponse.getExitStatusCode()));
+    }
+    @Test
+    public void testWaitTimeoutNormal() {
+        try {
+            k8sTaskExecutor.waitTimeout(true);
+        } catch (TaskException e) {
+            Assert.assertThat(e.getMessage(),is("K8sTask is timeout"));
+        }
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/pom.xml
new file mode 100644
index 0000000000..c49e74d102
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/pom.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>dolphinscheduler-task-plugin</artifactId>
+        <groupId>org.apache.dolphinscheduler</groupId>
+        <version>dev-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>dolphinscheduler-task-k8s</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-datasource-all</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-spi</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-task-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-datasource-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+    </dependencies>
+</project>
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
new file mode 100644
index 0000000000..a5ce9992f1
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.k8s;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAMESPACE_NAME;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTask;
+import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class K8sTask extends AbstractK8sTask {
+
+    /**
+     * taskExecutionContext
+     */
+    private final TaskExecutionContext taskExecutionContext;
+
+    /**
+     * task parameters
+     */
+    private final K8sTaskParameters k8sTaskParameters;
+
+    /**
+     * @param taskRequest taskRequest
+     */
+    public K8sTask(TaskExecutionContext taskRequest) {
+        super(taskRequest);
+        this.taskExecutionContext = taskRequest;
+        this.k8sTaskParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), K8sTaskParameters.class);
+        if (!k8sTaskParameters.checkParameters()) {
+            throw new TaskException("K8S task params is not valid");
+        }
+    }
+
+    @Override
+    public AbstractParameters getParameters() {
+        return k8sTaskParameters;
+    }
+
+    @Override
+    protected String buildCommand() {
+        K8sTaskMainParameters k8sTaskMainParameters = new K8sTaskMainParameters();
+        Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+        if (MapUtils.isEmpty(paramsMap)) {
+            paramsMap = new HashMap<>();
+        }
+        if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
+            paramsMap.putAll(taskExecutionContext.getParamsMap());
+        }
+        Map<String,String> namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace());
+        String namespaceName = namespace.get(NAMESPACE_NAME);
+        String clusterName = namespace.get(CLUSTER);
+        k8sTaskMainParameters.setImage(k8sTaskParameters.getImage());
+        k8sTaskMainParameters.setNamespaceName(namespaceName);
+        k8sTaskMainParameters.setClusterName(clusterName);
+        k8sTaskMainParameters.setMinCpuCores(k8sTaskParameters.getMinCpuCores());
+        k8sTaskMainParameters.setMinMemorySpace(k8sTaskParameters.getMinMemorySpace());
+        k8sTaskMainParameters.setParamsMap(ParamUtils.convert(paramsMap));
+        return JSONUtils.toJsonString(k8sTaskMainParameters);
+    }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannel.java
new file mode 100644
index 0000000000..3edb1c448d
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannel.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.k8s;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+public class K8sTaskChannel implements TaskChannel {
+    @Override
+    public void cancelApplication(boolean status) {
+
+    }
+
+    @Override
+    public AbstractParameters parseParameters(ParametersNode parametersNode) {
+        return JSONUtils.parseObject(parametersNode.getTaskParams(), K8sTaskParameters.class);
+    }
+
+    @Override
+    public ResourceParametersHelper getResources(String parameters) {
+        return null;
+    }
+
+    @Override
+    public AbstractTask createTask(TaskExecutionContext taskRequest) {
+        return new K8sTask(taskRequest);
+    }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannelFactory.java
new file mode 100644
index 0000000000..dd8caac035
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannelFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.k8s;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
+import org.apache.dolphinscheduler.spi.params.base.PluginParams;
+
+import java.util.List;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(TaskChannelFactory.class)
+public class K8sTaskChannelFactory implements TaskChannelFactory {
+    @Override
+    public String getName() {
+        return "K8S";
+    }
+
+    @Override
+    public List<PluginParams> getParams() {
+        return null;
+    }
+
+    @Override
+    public TaskChannel create() {
+        return new K8sTaskChannel();
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskParameters.java
new file mode 100644
index 0000000000..35f816ecba
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskParameters.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.k8s;
+
+import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * k8s task parameters
+ */
+public class K8sTaskParameters extends AbstractParameters {
+    private String image;
+    private String namespace;
+    private double minCpuCores;
+    private double minMemorySpace;
+
+    public String getImage() {
+        return image;
+    }
+
+    public void setImage(String image) {
+        this.image = image;
+    }
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+    public double getMinCpuCores() {
+        return minCpuCores;
+    }
+
+    public void setMinCpuCores(double minCpuCores) {
+        this.minCpuCores = minCpuCores;
+    }
+
+    public double getMinMemorySpace() {
+        return minMemorySpace;
+    }
+
+    public void setMinMemorySpace(double minMemorySpace) {
+        this.minMemorySpace = minMemorySpace;
+    }
+
+    @Override
+    public boolean checkParameters() {
+        return StringUtils.isNotEmpty(image) && StringUtils.isNotEmpty(namespace)
+            ;
+    }
+
+    @Override
+    public List<ResourceInfo> getResourceFilesList() {
+        return new ArrayList<>();
+    }
+
+    @Override
+    public String toString() {
+        return "K8sTaskParameters{"
+            + "image='" + image + '\''
+            + ", namespace='" + namespace + '\''
+            + ", minCpuCores=" + minCpuCores
+            + ", minMemorySpace=" + minMemorySpace
+            + '}';
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sParametersTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sParametersTest.java
new file mode 100644
index 0000000000..6a8892b4e4
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sParametersTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.k8s;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class K8sParametersTest {
+    private K8sTaskParameters k8sTaskParameters = null;
+    private final String image = "ds-dev";
+    private final String namespace = "{\"name\":\"default\",\"cluster\":\"lab\"}";
+    private final double minCpuCores = 2;
+    private final double minMemorySpace = 10;
+
+    @Before
+    public void before() {
+        k8sTaskParameters = new K8sTaskParameters();
+        k8sTaskParameters.setImage(image);
+        k8sTaskParameters.setNamespace(namespace);
+        k8sTaskParameters.setMinCpuCores(minCpuCores);
+        k8sTaskParameters.setMinMemorySpace(minMemorySpace);
+    }
+
+    @Test
+    public void testCheckParameterNormal() {
+        Assert.assertTrue(k8sTaskParameters.checkParameters());
+    }
+
+    @Test
+    public void testGetResourceFilesListNormal() {
+       Assert.assertNotNull(k8sTaskParameters.getResourceFilesList());
+       Assert.assertEquals(0, k8sTaskParameters.getResourceFilesList().size());
+    }
+
+    @Test
+    public void testK8sParameters() {
+        Assert.assertEquals(image, k8sTaskParameters.getImage());
+        Assert.assertEquals(namespace, k8sTaskParameters.getNamespace());
+        Assert.assertEquals(0, Double.compare(minCpuCores, k8sTaskParameters.getMinCpuCores()));
+        Assert.assertEquals(0,Double.compare(minMemorySpace, k8sTaskParameters.getMinMemorySpace()));
+    }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
new file mode 100644
index 0000000000..60506999e4
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.k8s;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class K8sTaskTest {
+    private K8sTaskParameters k8sTaskParameters = null;
+
+    private K8sTask k8sTask = null;
+    private final String image = "ds-dev";
+
+    private final String namespace = "{\"name\":\"default\",\"cluster\":\"lab\"}";
+
+    private final double minCpuCores = 2;
+
+    private final double minMemorySpace = 10;
+    private final int taskInstanceId = 1000;
+    private final String taskName = "k8s_task_test";
+
+    private final String DAY = "day";
+    private final String date = "20220507";
+    @Before
+    public void before() {
+        k8sTaskParameters = new K8sTaskParameters();
+        k8sTaskParameters.setImage(image);
+        k8sTaskParameters.setNamespace(namespace);
+        k8sTaskParameters.setMinCpuCores(minCpuCores);
+        k8sTaskParameters.setMinMemorySpace(minMemorySpace);
+        TaskExecutionContext taskRequest = new TaskExecutionContext();
+        taskRequest.setTaskInstanceId(taskInstanceId);
+        taskRequest.setTaskName(taskName);
+        taskRequest.setTaskParams(JSONUtils.toJsonString(k8sTaskParameters));
+        Property property = new Property();
+        property.setProp(DAY);
+        property.setDirect(Direct.IN);
+        property.setType(DataType.VARCHAR);
+        property.setValue(date);
+        Map<String, Property> paramsMap = new HashMap<>();
+        paramsMap.put(DAY,property);
+        taskRequest.setParamsMap(paramsMap);
+        k8sTask = new K8sTask(taskRequest);
+    }
+
+    @Test
+    public void testBuildCommandNormal() {
+        String expectedStr = "{\"image\":\"ds-dev\",\"namespaceName\":\"default\",\"clusterName\":\"lab\",\"minCpuCores\":2.0,\"minMemorySpace\":10.0,\"paramsMap\":{\"day\":\"20220507\"}}";
+        String commandStr = k8sTask.buildCommand();
+        Assert.assertEquals(expectedStr, commandStr);
+    }
+
+    @Test
+    public void testGetParametersNormal() {
+        String expectedStr = "K8sTaskParameters{image='ds-dev', namespace='{\"name\":\"default\",\"cluster\":\"lab\"}', minCpuCores=2.0, minMemorySpace=10.0}";
+        String result = k8sTask.getParameters().toString();
+        Assert.assertEquals(expectedStr, result);
+    }
+
+}
diff --git a/dolphinscheduler-task-plugin/pom.xml b/dolphinscheduler-task-plugin/pom.xml
index 65a479019c..bfd8ea7b05 100644
--- a/dolphinscheduler-task-plugin/pom.xml
+++ b/dolphinscheduler-task-plugin/pom.xml
@@ -50,6 +50,7 @@
         <module>dolphinscheduler-task-all</module>
         <module>dolphinscheduler-task-emr</module>
         <module>dolphinscheduler-task-blocking</module>
+        <module>dolphinscheduler-task-k8s</module>
         <module>dolphinscheduler-task-zeppelin</module>
         <module>dolphinscheduler-task-jupyter</module>
     </modules>
diff --git a/dolphinscheduler-ui/public/images/task-icons/k8s.png b/dolphinscheduler-ui/public/images/task-icons/k8s.png
new file mode 100644
index 0000000000..efce2bf14a
Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/k8s.png differ
diff --git a/dolphinscheduler-ui/public/images/task-icons/k8s_hover.png b/dolphinscheduler-ui/public/images/task-icons/k8s_hover.png
new file mode 100644
index 0000000000..ba79a67d22
Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/k8s_hover.png differ
diff --git a/dolphinscheduler-ui/src/locales/modules/en_US.ts b/dolphinscheduler-ui/src/locales/modules/en_US.ts
index 3395adaa85..1a88fabb6b 100644
--- a/dolphinscheduler-ui/src/locales/modules/en_US.ts
+++ b/dolphinscheduler-ui/src/locales/modules/en_US.ts
@@ -650,6 +650,14 @@ const project = {
     failed_retry_interval: 'Failed retry interval',
     minute: 'Minute',
     delay_execution_time: 'Delay execution time',
+    namespace_cluster: 'Namespace(Cluster)',
+    min_cpu: 'Min cpu',
+    min_memory: 'Min memory',
+    cores: 'Cores',
+    mb: 'MB',
+    image: 'Image',
+    image_tips: 'Please enter image',
+    min_memory_tips: 'Please enter min memory',
     state: 'State',
     branch_flow: 'Branch flow',
     cancel: 'Cancel',
diff --git a/dolphinscheduler-ui/src/locales/modules/zh_CN.ts b/dolphinscheduler-ui/src/locales/modules/zh_CN.ts
index f3a5e38578..576a782120 100644
--- a/dolphinscheduler-ui/src/locales/modules/zh_CN.ts
+++ b/dolphinscheduler-ui/src/locales/modules/zh_CN.ts
@@ -645,6 +645,14 @@ const project = {
     failed_retry_interval: '失败重试间隔',
     minute: '分',
     delay_execution_time: '延时执行时间',
+    namespace_cluster: '命名空间(集群)',
+    min_cpu: '最小cpu',
+    min_memory: '最小内存',
+    cores: '核',
+    mb: 'MB',
+    image: '镜像',
+    image_tips: '请输入镜像',
+    min_memory_tips: '请输入最小内存',
     state: '状态',
     branch_flow: '分支流转',
     cancel: '取消',
diff --git a/dolphinscheduler-ui/src/service/modules/k8s-namespace/index.ts b/dolphinscheduler-ui/src/service/modules/k8s-namespace/index.ts
index 2df1d5f703..524cac5b23 100644
--- a/dolphinscheduler-ui/src/service/modules/k8s-namespace/index.ts
+++ b/dolphinscheduler-ui/src/service/modules/k8s-namespace/index.ts
@@ -27,6 +27,13 @@ export function queryNamespaceListPaging(params: ListReq): any {
   })
 }
 
+export function getAllNamespaces(): any {
+  return axios({
+    url: '/k8s-namespace/available-list',
+    method: 'get'
+  })
+}
+
 export function verifyNamespaceK8s(params: K8SReq): any {
   return axios({
     url: '/k8s-namespace/verify',
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
index 59550af8fa..d4a733814f 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
@@ -62,4 +62,6 @@ export { useConditions } from './use-conditions'
 export { useDependent } from './use-dependent'
 export { useEmr } from './use-emr'
 export { useZeppelin } from './use-zeppelin'
+export { useNamespace } from './use-namespace'
+export { useK8s } from './use-k8s'
 export { useJupyter } from './use-jupyter'
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-k8s.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-k8s.ts
new file mode 100644
index 0000000000..f1635bea32
--- /dev/null
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-k8s.ts
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import { useCustomParams, useNamespace } from '.'
+import type { IJsonItem } from '../types'
+import { useI18n } from 'vue-i18n'
+
+export function useK8s(model: { [field: string]: any }): IJsonItem[] {
+  const { t } = useI18n()
+
+  return [
+    useNamespace(),
+    {
+      type: 'input-number',
+      field: 'minCpuCores',
+      span: 12,
+      name: t('project.node.min_cpu'),
+      slots: {
+        suffix: () => t('project.node.cores')
+      }
+    },
+    {
+      type: 'input-number',
+      field: 'minMemorySpace',
+      span: 12,
+      name: t('project.node.min_memory'),
+      slots: {
+        suffix: () => t('project.node.mb')
+      }
+    },
+    {
+      type: 'input',
+      field: 'image',
+      name: t('project.node.image'),
+      props: {
+        placeholder: t('project.node.image_tips')
+      },
+      validate: {
+        trigger: ['input', 'blur'],
+        message: t('project.node.min_memory_tips')
+      }
+    },
+    ...useCustomParams({ model, field: 'localParams', isSimple: true })
+  ]
+}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-namespace.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-namespace.ts
new file mode 100644
index 0000000000..983fd6506a
--- /dev/null
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-namespace.ts
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import type { IJsonItem } from '../types'
+import { useI18n } from 'vue-i18n'
+import { onMounted, ref, VNodeChild } from 'vue'
+import { getAllNamespaces } from '@/service/modules/k8s-namespace'
+import { SelectOption } from 'naive-ui'
+
+export function useNamespace(): IJsonItem {
+  const { t } = useI18n()
+
+  const options = ref([])
+  const loading = ref(false)
+
+  const getNamespaceList = async () => {
+    if (loading.value) return
+    loading.value = true
+    const totalList = await getAllNamespaces()
+    options.value = (totalList || []).map(
+      (item: { id: string; namespace: string; k8s: string }) => ({
+        label: `${item.namespace}(${item.k8s})`,
+        value: JSON.stringify({
+          name: item.namespace,
+          cluster: item.k8s
+        })
+      })
+    )
+    loading.value = false
+  }
+
+  onMounted(() => {
+    getNamespaceList()
+  })
+
+  const renderLabel = (option: SelectOption): VNodeChild => {
+    if (option.type === 'group') return option.label as string
+    return [option.label as string]
+  }
+
+  return {
+    type: 'select',
+    field: 'namespace',
+    name: t('project.node.namespace_cluster'),
+    props: {
+      loading,
+      'render-label': renderLabel
+    },
+    options: [
+      {
+        type: 'group',
+        label: t('project.node.namespace_cluster'),
+        key: t('project.node.namespace_cluster'),
+        children: options as any
+      }
+    ]
+  }
+}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 85e15a941c..a487d3802e 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -310,6 +310,13 @@ export function formatParams(data: INodeData): {
     taskParams.paragraphId = data.zeppelinParagraphId
   }
 
+  if (data.taskType === 'K8S') {
+    taskParams.namespace = data.namespace
+    taskParams.minCpuCores = data.minCpuCores
+    taskParams.minMemorySpace = data.minMemorySpace
+    taskParams.image = data.image
+  }
+
   if (data.taskType === 'JUPYTER') {
     taskParams.condaEnvName = data.condaEnvName
     taskParams.inputNotePath = data.inputNotePath
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
index 2e1eadcd7a..a9a0671dc0 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
@@ -34,6 +34,7 @@ import { useDependent } from './use-dependent'
 import { useDataQuality } from './use-data-quality'
 import { useEmr } from './use-emr'
 import { useZeppelin } from './use-zeppelin'
+import { useK8s } from './use-k8s'
 import { useJupyter } from './use-jupyter'
 
 export default {
@@ -56,5 +57,6 @@ export default {
   DATA_QUALITY: useDataQuality,
   EMR: useEmr,
   ZEPPELIN: useZeppelin,
+  K8S: useK8s,
   JUPYTER: useJupyter
 }
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts
new file mode 100644
index 0000000000..a55425ffb5
--- /dev/null
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive } from 'vue'
+import * as Fields from '../fields/index'
+import type { IJsonItem, INodeData } from '../types'
+import { ITaskData } from '../types'
+
+export function useK8s({
+  projectCode,
+  from = 0,
+  readonly,
+  data
+}: {
+  projectCode: number
+  from?: number
+  readonly?: boolean
+  data?: ITaskData
+}) {
+  const model = reactive({
+    name: '',
+    taskType: 'K8S',
+    flag: 'YES',
+    description: '',
+    timeoutFlag: false,
+    localParams: [],
+    environmentCode: null,
+    failRetryInterval: 1,
+    failRetryTimes: 0,
+    workerGroup: 'default',
+    delayTime: 0,
+    timeout: 30
+  } as INodeData)
+
+  let extra: IJsonItem[] = []
+  if (from === 1) {
+    extra = [
+      Fields.useTaskType(model, readonly),
+      Fields.useProcessName({
+        model,
+        projectCode,
+        isCreate: !data?.id,
+        from,
+        processName: data?.processName
+      })
+    ]
+  }
+
+  return {
+    json: [
+      Fields.useName(),
+      ...extra,
+      Fields.useRunFlag(),
+      Fields.useDescription(),
+      Fields.useTaskPriority(),
+      Fields.useWorkerGroup(),
+      Fields.useEnvironmentName(model, !model.id),
+      ...Fields.useTaskGroup(model, projectCode),
+      ...Fields.useFailed(),
+      Fields.useDelayTime(model),
+      ...Fields.useTimeoutAlarm(model),
+      ...Fields.useK8s(model),
+      Fields.usePreTasks()
+    ] as IJsonItem[],
+    model
+  }
+}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index b785d0f7a0..c98ae48a9a 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -306,6 +306,12 @@ interface ITaskParams {
   udfs?: string
   connParams?: string
   targetJobName?: string
+  cluster?: string
+  namespace?: string
+  clusterNamespace?: string
+  minCpuCores?: string
+  minMemorySpace?: string
+  image?: string
 }
 
 interface INodeData
diff --git a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
index 25deefdef2..b3b68037c0 100644
--- a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
@@ -34,6 +34,7 @@ export type TaskType =
   | 'SEATUNNEL'
   | 'EMR'
   | 'ZEPPELIN'
+  | 'K8S'
   | 'JUPYTER'
 
 export const TASK_TYPES_MAP = {
@@ -103,5 +104,9 @@ export const TASK_TYPES_MAP = {
   JUPYTER: {
     alias: 'JUPYTER',
     helperLinkDisable: true
+  },
+  K8S: {
+    alias: 'K8S',
+    helperLinkDisable: true
   }
 } as { [key in TaskType]: { alias: string; helperLinkDisable?: boolean } }
diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
index 8cc075c901..3fbc70fa48 100644
--- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
+++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
@@ -158,6 +158,9 @@ $bgLight: #ffffff;
     &.icon-zeppelin {
       background-image: url('/images/task-icons/zeppelin.png');
     }
+    &.icon-k8s {
+      background-image: url('/images/task-icons/k8s.png');
+    }
     &.icon-jupyter {
       background-image: url('/images/task-icons/jupyter.png');
     }
@@ -222,6 +225,9 @@ $bgLight: #ffffff;
       &.icon-zeppelin {
         background-image: url('/images/task-icons/zeppelin_hover.png');
       }
+      &.icon-k8s {
+        background-image: url('/images/task-icons/k8s_hover.png');
+      }
       &.icon-jupyter {
         background-image: url('/images/task-icons/jupyter_hover.png');
       }