You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/11/11 08:08:44 UTC

[dolphinscheduler] branch dev updated: [feature][task] Add Kubeflow task plugin for MLOps scenario (#12843)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 229c554912 [feature][task] Add Kubeflow task plugin for MLOps scenario (#12843)
229c554912 is described below

commit 229c5549129dabaeaf721dd15af9e8f73dcfadf9
Author: JieguangZhou <ji...@163.com>
AuthorDate: Fri Nov 11 16:08:38 2022 +0800

    [feature][task] Add Kubeflow task plugin for MLOps scenario (#12843)
---
 docs/configs/docsdev.js                            |   8 ++
 docs/docs/en/guide/task/kubeflow.md                |  42 ++++++
 docs/docs/zh/guide/task/kubeflow.md                |  40 ++++++
 docs/img/tasks/demo/kubeflow.png                   | Bin 0 -> 72081 bytes
 docs/img/tasks/icons/kubeflow.png                  | Bin 0 -> 159122 bytes
 .../src/main/resources/task-type-config.yaml       |   3 +-
 .../master/runner/task/BaseTaskProcessor.java      |   4 +-
 .../dolphinscheduler-task-all/pom.xml              |   6 +
 .../plugin/task/api/TaskConstants.java             |   5 +
 .../dolphinscheduler-task-kubeflow/pom.xml         |  50 +++++++
 .../plugin/kubeflow/KubeflowHelper.java            | 126 +++++++++++++++++
 .../plugin/kubeflow/KubeflowParameters.java        |  37 +++++
 .../plugin/kubeflow/KubeflowTask.java              | 153 +++++++++++++++++++++
 .../plugin/kubeflow/KubeflowTaskChannel.java       |  48 +++++++
 .../kubeflow/KubeflowTaskChannelFactory.java       |  47 +++++++
 .../plugin/kubeflow/KubeflowHelperTest.java        |  99 +++++++++++++
 .../plugin/kubeflow/KubeflowTaskTest.java          | 153 +++++++++++++++++++++
 .../src/test/resources/clusterConfigYAML.yaml      |  60 +++-----
 .../src/test/resources/jobConfigYAML.yaml          |  60 +++-----
 dolphinscheduler-task-plugin/pom.xml               |   1 +
 .../public/images/task-icons/kubeflow.png          | Bin 0 -> 159122 bytes
 .../public/images/task-icons/kubeflow_hover.png    | Bin 0 -> 118124 bytes
 dolphinscheduler-ui/src/store/project/task-type.ts |   4 +
 dolphinscheduler-ui/src/store/project/types.ts     |   1 +
 .../projects/task/components/node/fields/index.ts  |   1 +
 .../task/components/node/fields/use-kubeflow.ts    |  38 +++++
 .../projects/task/components/node/format-data.ts   |   5 +
 .../projects/task/components/node/tasks/index.ts   |   7 +-
 .../task/components/node/tasks/use-kubeflow.ts     |  67 +++++++++
 .../views/projects/task/components/node/types.ts   |   1 +
 .../src/views/projects/task/constants/task-type.ts |   5 +
 .../workflow/components/dag/dag.module.scss        |   6 +
 32 files changed, 990 insertions(+), 87 deletions(-)

diff --git a/docs/configs/docsdev.js b/docs/configs/docsdev.js
index 279fa2fc8a..f0a079ad91 100644
--- a/docs/configs/docsdev.js
+++ b/docs/configs/docsdev.js
@@ -213,6 +213,10 @@ export default {
                                 title: 'AWS Datasync',
                                 link: '/en-us/docs/dev/user_doc/guide/task/datasync.html',
                             },
+                            {
+                                title: 'Kubeflow',
+                                link: '/en-us/docs/dev/user_doc/guide/task/kubeflow.html',
+                            },
                         ],
                     },
                     {
@@ -869,6 +873,10 @@ export default {
                                 title: 'AWS Datasync',
                                 link: '/zh-cn/docs/dev/user_doc/guide/task/datasync.html',
                             },
+                            {
+                                title: 'Kubeflow',
+                                link: '/zh-cn/docs/dev/user_doc/guide/task/kubeflow.html',
+                            },
                         ],
                     },
                     {
diff --git a/docs/docs/en/guide/task/kubeflow.md b/docs/docs/en/guide/task/kubeflow.md
new file mode 100644
index 0000000000..63918ab438
--- /dev/null
+++ b/docs/docs/en/guide/task/kubeflow.md
@@ -0,0 +1,42 @@
+# Kubeflow Node
+
+## Overview
+
+[Kubeflow](https://www.kubeflow.org) task type is used to create tasks on Kubeflow.
+
+The backend mainly uses the `kubectl` command to create kubeflow tasks, and continues to monitor the resource status on Kubeflow until the task is completed.
+
+Now it mainly supports creating kubeflow tasks using yaml files. If you need to publish `kubeflow pipeline` tasks, you can use the [python task type](./python.md).
+
+## Create Task
+
+- Click `Project Management -> Project Name -> Workflow Definition`, and click the `Create Workflow` button to enter the DAG editing page.
+- Drag <img src="../../../../img/tasks/icons/kubeflow.png" width="15"/> from the toolbar to the canvas.
+
+## Task Example
+
+The task plugin picture is as follows
+
+![kubeflow](../../../../img/tasks/demo/kubeflow.png)
+
+### First, introduce some general parameters of DolphinScheduler
+
+- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters.
+
+### Here are some specific parameters for the Kubeflow plugin
+
+- **Namespace**:The namespace parameter of the cluster
+- **yamlContent**:CRD YAML file content
+
+## Environment Configuration
+
+**Configure Kubernetes environment**
+
+Reference [Cluster Management and Namespace Management](../security.md).
+
+Only the required fields need to be filled in, and the others do not need to be filled in. The resource management depends on the YAML file definition in the specific Job.
+
+**kubectl**
+
+Install [kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl-linux/), and make sure `kubectl` can submit tasks to kubeflow normally.
+
diff --git a/docs/docs/zh/guide/task/kubeflow.md b/docs/docs/zh/guide/task/kubeflow.md
new file mode 100644
index 0000000000..d5b2b9a404
--- /dev/null
+++ b/docs/docs/zh/guide/task/kubeflow.md
@@ -0,0 +1,40 @@
+# Kubeflow
+
+## 综述
+
+[Kubeflow](https://www.kubeflow.org) 任务类型,用于在Kubeflow上创建任务。
+后台主要使用 `kubectl` 命令来创建kubeflow任务, 并持续Kubeflow上资源状态直至任务完成。
+目前主要支持通过使用yaml文件来创建kubeflow任务。 如果需要发布`kubeflow pipeline`任务可以使用[python任务类型](./python.md)。
+
+## 创建任务
+
+- 点击项目管理-项目名称-工作流定义,点击“创建工作流”按钮,进入 DAG 编辑页面;
+- 拖动工具栏的 <img src="../../../../img/tasks/icons/kubeflow.png" width="15"/> 任务节点到画板中。
+
+## 任务样例
+
+组件图示如下:
+
+![kubeflow](../../../../img/tasks/demo/kubeflow.png)
+
+### 首先介绍一些DS通用参数
+
+- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。
+
+### Kubeflow组件独有的参数
+
+- **Namespace**:集群命名空间参数
+- **yamlContent**:CRD YAML文件内容
+
+## 环境配置
+
+**配置Kubernetes环境**
+
+参考[集群管理和命名空间管理](../security.md)。
+
+只需填写必填项即可,其他无需填写,资源管理依赖于具体Job中的YAML文件定义。
+
+**kubectl**
+
+安装[kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl-linux/),并确保`kubectl`能正常提交任务到kubeflow。
+
diff --git a/docs/img/tasks/demo/kubeflow.png b/docs/img/tasks/demo/kubeflow.png
new file mode 100644
index 0000000000..ac80c2e536
Binary files /dev/null and b/docs/img/tasks/demo/kubeflow.png differ
diff --git a/docs/img/tasks/icons/kubeflow.png b/docs/img/tasks/icons/kubeflow.png
new file mode 100644
index 0000000000..72121f114b
Binary files /dev/null and b/docs/img/tasks/icons/kubeflow.png differ
diff --git a/dolphinscheduler-api/src/main/resources/task-type-config.yaml b/dolphinscheduler-api/src/main/resources/task-type-config.yaml
index 0ce770c2c5..7144b4a2c9 100644
--- a/dolphinscheduler-api/src/main/resources/task-type-config.yaml
+++ b/dolphinscheduler-api/src/main/resources/task-type-config.yaml
@@ -51,8 +51,9 @@ task:
     - 'DVC'
     - 'SAGEMAKER'
     - 'PYTORCH'
+    - 'KUBEFLOW'
   other:
     - 'PIGEON'
     - 'ZEPPELIN'
     - 'CHUNJUN'
-    - 'DATASYNC'
\ No newline at end of file
+    - 'DATASYNC'
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 31050fb163..a47df6979a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -26,7 +26,7 @@ import static org.apache.dolphinscheduler.common.constants.Constants.SINGLE_SLAS
 import static org.apache.dolphinscheduler.common.constants.Constants.USER;
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DATA_QUALITY;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_K8S;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SET_K8S;
 import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_NAME;
 import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TABLE;
 import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TYPE;
@@ -325,7 +325,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
             setDataQualityTaskRelation(dataQualityTaskExecutionContext, taskInstance, tenant.getTenantCode());
         }
         K8sTaskExecutionContext k8sTaskExecutionContext = new K8sTaskExecutionContext();
-        if (TASK_TYPE_K8S.equalsIgnoreCase(taskInstance.getTaskType())) {
+        if (TASK_TYPE_SET_K8S.contains(taskInstance.getTaskType())) {
             setK8sTaskRelation(k8sTaskExecutionContext, taskInstance);
         }
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
index 256aa99144..d355ecf02d 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
@@ -230,6 +230,12 @@
             <artifactId>dolphinscheduler-task-datasync</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-task-kubeflow</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
index 8ba7f5fa6b..bb78d178c8 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
@@ -18,6 +18,9 @@
 package org.apache.dolphinscheduler.plugin.task.api;
 
 import java.time.Duration;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
 
 public class TaskConstants {
 
@@ -433,6 +436,8 @@ public class TaskConstants {
 
     public static final String TASK_TYPE_K8S = "K8S";
 
+    public static final Set<String> TASK_TYPE_SET_K8S = Sets.newHashSet("K8S", "KUBEFLOW");
+
     public static final String TASK_TYPE_BLOCKING = "BLOCKING";
 
     public static final String TASK_TYPE_STREAM = "STREAM";
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/pom.xml
new file mode 100644
index 0000000000..3fab0aeee4
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.dolphinscheduler</groupId>
+        <artifactId>dolphinscheduler-task-plugin</artifactId>
+        <version>dev-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>dolphinscheduler-task-kubeflow</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-datasource-all</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-spi</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-task-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-datasource-api</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowHelper.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowHelper.java
new file mode 100644
index 0000000000..e01438a982
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowHelper.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.kubeflow;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Sets;
+
+public class KubeflowHelper {
+
+    protected final Logger logger =
+            LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
+
+    private final String clusterConfigPath;
+
+    private int messageIndex = 0;
+
+    public KubeflowHelper(String clusterConfigPath) {
+        this.clusterConfigPath = clusterConfigPath;
+    }
+
+    public String buildSubmitCommand(String yamlFilePATH) {
+        List<String> args = new ArrayList<>();
+        args.add(String.format(COMMAND.SET_CONFIG, clusterConfigPath));
+        args.add(String.format(COMMAND.APPLY, yamlFilePATH));
+        return String.join("\n", args);
+    }
+
+    public String buildGetCommand(String yamlFilePATH) {
+        List<String> args = new ArrayList<>();
+        args.add(String.format(COMMAND.SET_CONFIG, clusterConfigPath));
+        args.add(String.format(COMMAND.GET, yamlFilePATH));
+        return String.join("\n", args);
+    }
+
+    public String buildDeleteCommand(String yamlFilePATH) {
+        List<String> args = new ArrayList<>();
+        args.add(String.format(COMMAND.SET_CONFIG, clusterConfigPath));
+        args.add(String.format(COMMAND.DELETE, yamlFilePATH));
+        return String.join("\n", args);
+    }
+
+    public String parseGetMessage(String message) {
+        JsonNode data = JSONUtils.parseObject(message);
+        if (!data.has("status")) {
+            return "";
+        }
+        JsonNode status = data.get("status");
+
+        if (status.has("conditions")) {
+            JsonNode conditions = status.get("conditions");
+            for (int x = messageIndex; x < conditions.size(); x = x + 1) {
+                JsonNode condition = conditions.get(x);
+                String stepMessage = condition.toString();
+                logger.info(stepMessage);
+            }
+            messageIndex = conditions.size();
+        }
+        String phase;
+        if (status.has("phase")) {
+            phase = status.get("phase").asText();
+        } else {
+            phase = "";
+        }
+        return phase;
+    }
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class ApplicationIds {
+
+        boolean isAlreadySubmitted;
+    }
+
+    public static class STATUS {
+
+        public static final HashSet<String> SUCCESS_SET = Sets.newHashSet("Succeeded", "Available", "Bound");
+        public static final HashSet<String> FAILED_SET = Sets.newHashSet("Failed");
+
+    }
+
+    public static class CONSTANTS {
+
+        public static final int TRACK_INTERVAL = 3000;
+        public static final String YAML_FILE_PATH = "kubeflow.yaml";
+        public static final String CLUSTER_CONFIG_PATH = ".cluster.yaml";
+    }
+
+    public static class COMMAND {
+
+        public static final String SET_CONFIG = "export KUBECONFIG=%s";
+        public static final String APPLY = "kubectl apply -f %s";
+        public static final String GET = "kubectl get -f %s -o json";
+        public static final String DELETE = "kubectl delete -f %s";
+
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowParameters.java
new file mode 100644
index 0000000000..727cbc0e47
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowParameters.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.kubeflow;
+
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.Data;
+
+@Data
+public class KubeflowParameters extends AbstractParameters {
+
+    private String yamlContent;
+
+    private String clusterYAML;
+
+    public boolean checkParameters() {
+        return StringUtils.isNotEmpty(yamlContent);
+    }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java
new file mode 100644
index 0000000000..e55ff100dd
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.kubeflow;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class KubeflowTask extends AbstractRemoteTask {
+
+    private final TaskExecutionContext taskExecutionContext;
+    protected KubeflowHelper kubeflowHelper;
+    private KubeflowParameters kubeflowParameters;
+    private Path clusterYAMLPath;
+
+    private Path yamlPath;
+
+    public KubeflowTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+        this.taskExecutionContext = taskExecutionContext;
+    }
+
+    @Override
+    public void init() throws TaskException {
+        logger.info("Kubeflow task params {}", taskExecutionContext.getTaskParams());
+        kubeflowParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), KubeflowParameters.class);
+
+        kubeflowParameters.setClusterYAML(taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml());
+        if (!kubeflowParameters.checkParameters()) {
+            throw new TaskException("Kubeflow task params is not valid");
+        }
+
+        writeFiles();
+        kubeflowHelper = new KubeflowHelper(clusterYAMLPath.toString());
+    }
+
+    @Override
+    public void submitApplication() throws TaskException {
+        String command = kubeflowHelper.buildSubmitCommand(yamlPath.toString());
+        logger.info("Kubeflow task submit command: \n{}", command);
+        String message = runCommand(command);
+        logger.info("Kubeflow task submit result: \n{}", message);
+
+        KubeflowHelper.ApplicationIds applicationIds = new KubeflowHelper.ApplicationIds();
+        applicationIds.setAlreadySubmitted(true);
+        setAppIds(JSONUtils.toJsonString(applicationIds));
+    }
+
+    /**
+     * keep checking application status
+     *
+     * @throws TaskException
+     */
+    @Override
+    public void trackApplicationStatus() throws TaskException {
+        String command = kubeflowHelper.buildGetCommand(yamlPath.toString());
+        logger.info("Kubeflow task get command: \n{}", command);
+        do {
+            ThreadUtils.sleep(KubeflowHelper.CONSTANTS.TRACK_INTERVAL);
+            String message = runCommand(command);
+            String phase = kubeflowHelper.parseGetMessage(message);
+            if (KubeflowHelper.STATUS.FAILED_SET.contains(phase)) {
+                exitStatusCode = TaskConstants.EXIT_CODE_FAILURE;
+                logger.info("Kubeflow task get Failed result: \n{}", message);
+                break;
+            } else if (KubeflowHelper.STATUS.SUCCESS_SET.contains(phase)) {
+                exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+                logger.info("Kubeflow task get Succeeded result: \n{}", message);
+                break;
+            }
+        } while (true);
+
+    }
+
+    @Override
+    public void cancelApplication() throws TaskException {
+        String command = kubeflowHelper.buildDeleteCommand(yamlPath.toString());
+        logger.info("Kubeflow task delete command: \n{}", command);
+        String message = runCommand(command);
+        logger.info("Kubeflow task delete result: \n{}", message);
+    }
+
+    protected String runCommand(String command) {
+        try {
+            exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+            return OSUtils.exeShell(new String[]{"sh", "-c", command});
+        } catch (Exception e) {
+            exitStatusCode = TaskConstants.EXIT_CODE_FAILURE;
+            throw new TaskException("Kubeflow task submit command failed", e);
+        }
+    }
+
+    @Override
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
+    }
+
+    public void writeFiles() {
+        String yamlContent = kubeflowParameters.getYamlContent();
+        String clusterYAML = kubeflowParameters.getClusterYAML();
+
+        Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
+        yamlContent = ParameterUtils.convertParameterPlaceholders(yamlContent, ParamUtils.convert(paramsMap));
+
+        yamlPath = Paths.get(taskExecutionContext.getExecutePath(), KubeflowHelper.CONSTANTS.YAML_FILE_PATH);
+        clusterYAMLPath =
+                Paths.get(taskExecutionContext.getExecutePath(), KubeflowHelper.CONSTANTS.CLUSTER_CONFIG_PATH);
+
+        logger.info("Kubeflow task yaml content: \n{}", yamlContent);
+        try {
+            Files.write(yamlPath, yamlContent.getBytes(), StandardOpenOption.CREATE);
+            Files.write(clusterYAMLPath, clusterYAML.getBytes(), StandardOpenOption.CREATE);
+        } catch (IOException e) {
+            throw new TaskException("Kubeflow task write yaml file failed", e);
+        }
+    }
+
+    @Override
+    public KubeflowParameters getParameters() {
+        return kubeflowParameters;
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskChannel.java
new file mode 100644
index 0000000000..5bfdc9e258
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskChannel.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.kubeflow;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
+
+public class KubeflowTaskChannel implements TaskChannel {
+
+    @Override
+    public void cancelApplication(boolean status) {
+
+    }
+
+    @Override
+    public KubeflowTask createTask(TaskExecutionContext taskRequest) {
+        return new KubeflowTask(taskRequest);
+    }
+
+    @Override
+    public AbstractParameters parseParameters(ParametersNode parametersNode) {
+        return JSONUtils.parseObject(parametersNode.getTaskParams(), KubeflowParameters.class);
+    }
+
+    @Override
+    public ResourceParametersHelper getResources(String parameters) {
+        return null;
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskChannelFactory.java
new file mode 100644
index 0000000000..9481311ce6
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskChannelFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.kubeflow;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
+import org.apache.dolphinscheduler.spi.params.base.PluginParams;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(TaskChannelFactory.class)
+public class KubeflowTaskChannelFactory implements TaskChannelFactory {
+
+    @Override
+    public TaskChannel create() {
+        return new KubeflowTaskChannel();
+    }
+
+    @Override
+    public String getName() {
+        return "KUBEFLOW";
+    }
+
+    @Override
+    public List<PluginParams> getParams() {
+        return Collections.emptyList();
+    }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowHelperTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowHelperTest.java
new file mode 100644
index 0000000000..4edcafa671
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowHelperTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.kubeflow;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class KubeflowHelperTest {
+
+    private String clusterConfigPath = "/tmp/dolphinscheduler/.kube/config";
+
+    private KubeflowHelper kubeflowHelper;
+
+    @BeforeEach
+    public void init() {
+        kubeflowHelper = new KubeflowHelper(clusterConfigPath);
+    }
+
+    @Test
+    public void testBuildSubmitCommand() {
+        String yamlFilePATH = "/tmp/dolphinscheduler/test.yaml";
+        String command = kubeflowHelper.buildSubmitCommand(yamlFilePATH);
+        String expectCommand = String.format("export KUBECONFIG=%s\n", clusterConfigPath) +
+                String.format("kubectl apply -f %s", yamlFilePATH);
+        Assertions.assertEquals(expectCommand, command);
+    }
+
+    @Test
+    public void testBuildGetCommand() {
+        String yamlFilePATH = "/tmp/dolphinscheduler/test.yaml";
+        String command = kubeflowHelper.buildGetCommand(yamlFilePATH);
+        String expectCommand = String.format("export KUBECONFIG=%s\n", clusterConfigPath) +
+                String.format("kubectl get -f %s -o json", yamlFilePATH);
+        Assertions.assertEquals(expectCommand, command);
+    }
+
+    @Test
+    public void testBuildDeleteCommand() {
+        String yamlFilePATH = "/tmp/dolphinscheduler/test.yaml";
+        String command = kubeflowHelper.buildDeleteCommand(yamlFilePATH);
+        String expectCommand = String.format("export KUBECONFIG=%s\n", clusterConfigPath) +
+                String.format("kubectl delete -f %s", yamlFilePATH);
+        Assertions.assertEquals(expectCommand, command);
+    }
+
+    @Test
+    public void testParseGetMessage() {
+        String message = "{\n" +
+                "    \"apiVersion\": \"kubeflow.org/v1\",\n" +
+                "    \"kind\": \"PyTorchJob\",\n" +
+                "    \"status\": {\n" +
+                "        \"conditions\": [\n" +
+                "            {\n" +
+                "                \"key\": \"value\"\n" +
+                "            },\n" +
+                "            {\n" +
+                "                \"key\": \"value\"\n" +
+                "            }\n" +
+                "        ],\n" +
+                "        \"phase\": \"Succeeded\"\n" +
+                "    }\n" +
+                "}\n";
+        Assertions.assertEquals("Succeeded", kubeflowHelper.parseGetMessage(message));
+
+        String messageError1 = "{\n" +
+                "    \"apiVersion\": \"kubeflow.org/v1\",\n" +
+                "    \"kind\": \"PyTorchJob\"\n" +
+                "}\n";
+
+        Assertions.assertDoesNotThrow(() -> kubeflowHelper.parseGetMessage(messageError1));
+
+        String messageError2 = "{\n" +
+                "    \"apiVersion\": \"kubeflow.org/v1\",\n" +
+                "    \"kind\": \"PyTorchJob\",\n" +
+                "    \"status\": {\n" +
+                "        \"phase\": \"Failed\"\n" +
+                "    }\n" +
+                "}\n";
+
+        Assertions.assertEquals("Failed", kubeflowHelper.parseGetMessage(messageError2));
+
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskTest.java
new file mode 100644
index 0000000000..3a0bfee231
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.kubeflow;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class KubeflowTaskTest {
+
+    public static String clusterConfigName = "clusterConfigYAML.yaml";
+
+    public static String jobConfigName = "jobConfigYAML.yaml";
+
+    public static String readFile(String fileName) throws IOException {
+        String path = KubeflowHelperTest.class.getClassLoader().getResource(fileName).getPath();
+        String content = Files.lines(Paths.get(path), StandardCharsets.UTF_8)
+                .collect(Collectors.joining(System.lineSeparator()));
+
+        return content;
+    }
+
+    @Test
+    public void testInit() throws IOException {
+        KubeflowParameters kubeflowParameters = createKubeflowParameters();
+        KubeflowTask task = createTask(kubeflowParameters);
+        Assertions.assertEquals(kubeflowParameters.getClusterYAML(), task.getParameters().getClusterYAML());
+        Assertions.assertEquals(kubeflowParameters.getYamlContent(), task.getParameters().getYamlContent());
+
+        KubeflowParameters kubeflowParametersError2 = new KubeflowParameters();
+        kubeflowParameters.setYamlContent(readFile(clusterConfigName));
+        Assertions.assertThrows(TaskException.class, () -> {
+            createTask(kubeflowParametersError2);
+        });
+
+    }
+
+    @Test
+    public void TestSubmit() throws IOException {
+        KubeflowParameters kubeflowParameters = createKubeflowParameters();
+        KubeflowTask task = Mockito.spy(createTask(kubeflowParameters));
+        Mockito.when(task.runCommand(Mockito.anyString())).thenReturn("test_result");
+        task.submitApplication();
+        Assertions.assertNotEquals(task.getAppIds(), null);
+        Assertions.assertEquals(task.getExitStatusCode(), TaskConstants.EXIT_CODE_SUCCESS);
+    }
+
+    @Test
+    public void TestTrack() throws IOException {
+        KubeflowParameters kubeflowParameters = createKubeflowParameters();
+
+        TaskExecutionContext taskExecutionContext = createTaskExecutionContext(kubeflowParameters);
+        TestTask task = Mockito.spy(new TestTask(taskExecutionContext));
+        Mockito.when(task.runCommand(Mockito.anyString())).thenReturn("track_result");
+        task.init();
+
+        KubeflowHelper kubeflowHelper = Mockito.mock(KubeflowHelper.class);
+        Mockito.when(kubeflowHelper.buildGetCommand(Mockito.anyString())).thenReturn("");
+        task.setKubeflowHelper(kubeflowHelper);
+
+        Mockito.when(kubeflowHelper.parseGetMessage(Mockito.anyString())).thenReturn("Succeeded");
+        task.trackApplicationStatus();
+        Assertions.assertEquals(task.getExitStatusCode(), TaskConstants.EXIT_CODE_SUCCESS);
+
+        Mockito.when(kubeflowHelper.parseGetMessage(Mockito.anyString())).thenReturn("Failed");
+        task.trackApplicationStatus();
+        Assertions.assertEquals(task.getExitStatusCode(), TaskConstants.EXIT_CODE_FAILURE);
+
+        Mockito.when(kubeflowHelper.parseGetMessage(Mockito.anyString())).thenReturn("", "Succeeded");
+        task.trackApplicationStatus();
+        Assertions.assertEquals(task.getExitStatusCode(), TaskConstants.EXIT_CODE_SUCCESS);
+    }
+
+    @Test
+    public void TestCancel() throws IOException {
+        KubeflowParameters kubeflowParameters = createKubeflowParameters();
+        KubeflowTask task = Mockito.spy(createTask(kubeflowParameters));
+        Mockito.when(task.runCommand(Mockito.anyString())).thenReturn("delete_result");
+        task.cancelApplication();
+        Assertions.assertEquals(task.getExitStatusCode(), TaskConstants.EXIT_CODE_SUCCESS);
+    }
+
+    public KubeflowTask createTask(KubeflowParameters kubeflowParameters) {
+        TaskExecutionContext taskExecutionContext = createTaskExecutionContext(kubeflowParameters);
+        KubeflowTask kubeflowTask = new KubeflowTask(taskExecutionContext);
+        kubeflowTask.init();
+        return kubeflowTask;
+    }
+
+    public TaskExecutionContext createTaskExecutionContext(KubeflowParameters kubeflowParameters) {
+        String parameters = JSONUtils.toJsonString(kubeflowParameters);
+        TaskExecutionContext taskExecutionContext =
+                Mockito.mock(TaskExecutionContext.class, Mockito.RETURNS_DEEP_STUBS);
+        Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
+        Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp/dolphinscheduler/kubeflow");
+        File file = new File("/tmp/dolphinscheduler/kubeflow");
+        if (!file.exists()) {
+            file.mkdirs();
+        }
+        Mockito.when(taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml())
+                .thenReturn(kubeflowParameters.getClusterYAML());
+        TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
+
+        return taskExecutionContext;
+    }
+
+    public KubeflowParameters createKubeflowParameters() throws IOException {
+        KubeflowParameters kubeflowParameters = new KubeflowParameters();
+        kubeflowParameters.setClusterYAML(readFile(clusterConfigName));
+        kubeflowParameters.setYamlContent(readFile(jobConfigName));
+        return kubeflowParameters;
+    }
+
+    public static class TestTask extends KubeflowTask {
+
+        public TestTask(TaskExecutionContext taskExecutionContext) {
+            super(taskExecutionContext);
+        }
+
+        public void setKubeflowHelper(KubeflowHelper kubeflowHelper) {
+            this.kubeflowHelper = kubeflowHelper;
+        }
+    }
+
+}
diff --git a/dolphinscheduler-api/src/main/resources/task-type-config.yaml b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/resources/clusterConfigYAML.yaml
similarity index 56%
copy from dolphinscheduler-api/src/main/resources/task-type-config.yaml
copy to dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/resources/clusterConfigYAML.yaml
index 0ce770c2c5..9e7b88bba2 100644
--- a/dolphinscheduler-api/src/main/resources/task-type-config.yaml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/resources/clusterConfigYAML.yaml
@@ -15,44 +15,22 @@
 # limitations under the License.
 #
 
-task:
-  universal:
-    - 'SHELL'
-    - 'JAVA'
-    - 'PYTHON'
-    - 'PROCEDURE'
-    - 'SQL'
-    - 'SPARK'
-    - 'FLINK'
-    - 'HTTP'
-    - 'MR'
-    - 'DINKY'
-    - 'FLINK_STREAM'
-    - 'HIVECLI'
-  cloud:
-    - 'EMR'
-    - 'K8S'
-    - 'DMS'
-  logic:
-    - 'SUB_PROCESS'
-    - 'DEPENDENT'
-    - 'CONDITIONS'
-    - 'SWITCH'
-  dataIntegration:
-    - 'SEATUNNEL'
-    - 'DATAX'
-    - 'SQOOP'
-  dataQuality:
-    - 'DATA_QUALITY'
-  machineLearning:
-    - 'JUPYTER'
-    - 'MLFLOW'
-    - 'OPENMLDB'
-    - 'DVC'
-    - 'SAGEMAKER'
-    - 'PYTORCH'
-  other:
-    - 'PIGEON'
-    - 'ZEPPELIN'
-    - 'CHUNJUN'
-    - 'DATASYNC'
\ No newline at end of file
+apiVersion: v1
+clusters:
+  - cluster:
+      certificate-authority-data: 123
+      server: https://192.168.2.90:40787
+    name: kind-kubeflow
+contexts:
+  - context:
+      cluster: kind-kubeflow
+      user: kind-kubeflow
+    name: kind-kubeflow
+current-context: kind-kubeflow
+kind: Config
+preferences: {}
+users:
+  - name: kind-kubeflow
+    user:
+      client-certificate-data: 123
+      client-key-data: 123
diff --git a/dolphinscheduler-api/src/main/resources/task-type-config.yaml b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/resources/jobConfigYAML.yaml
similarity index 56%
copy from dolphinscheduler-api/src/main/resources/task-type-config.yaml
copy to dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/resources/jobConfigYAML.yaml
index 0ce770c2c5..7da5667cb0 100644
--- a/dolphinscheduler-api/src/main/resources/task-type-config.yaml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/resources/jobConfigYAML.yaml
@@ -15,44 +15,22 @@
 # limitations under the License.
 #
 
-task:
-  universal:
-    - 'SHELL'
-    - 'JAVA'
-    - 'PYTHON'
-    - 'PROCEDURE'
-    - 'SQL'
-    - 'SPARK'
-    - 'FLINK'
-    - 'HTTP'
-    - 'MR'
-    - 'DINKY'
-    - 'FLINK_STREAM'
-    - 'HIVECLI'
-  cloud:
-    - 'EMR'
-    - 'K8S'
-    - 'DMS'
-  logic:
-    - 'SUB_PROCESS'
-    - 'DEPENDENT'
-    - 'CONDITIONS'
-    - 'SWITCH'
-  dataIntegration:
-    - 'SEATUNNEL'
-    - 'DATAX'
-    - 'SQOOP'
-  dataQuality:
-    - 'DATA_QUALITY'
-  machineLearning:
-    - 'JUPYTER'
-    - 'MLFLOW'
-    - 'OPENMLDB'
-    - 'DVC'
-    - 'SAGEMAKER'
-    - 'PYTORCH'
-  other:
-    - 'PIGEON'
-    - 'ZEPPELIN'
-    - 'CHUNJUN'
-    - 'DATASYNC'
\ No newline at end of file
+
+apiVersion: "kubeflow.org/v1"
+kind: TFJob
+metadata:
+  name: test-tfjob
+  namespace: kubeflow-user-example-com
+spec:
+  tfReplicaSpecs:
+    Worker:
+      replicas: 2
+      restartPolicy: OnFailure
+      template:
+        spec:
+          containers:
+            - name: tensorflow
+              image: repo/mnist_with_summaries:v1
+              command:
+                - "python"
+                - "mnist_with_summaries.py"
diff --git a/dolphinscheduler-task-plugin/pom.xml b/dolphinscheduler-task-plugin/pom.xml
index 14fe703d7e..dcc0023f7e 100644
--- a/dolphinscheduler-task-plugin/pom.xml
+++ b/dolphinscheduler-task-plugin/pom.xml
@@ -64,6 +64,7 @@
         <module>dolphinscheduler-task-hivecli</module>
         <module>dolphinscheduler-task-dms</module>
         <module>dolphinscheduler-task-datasync</module>
+        <module>dolphinscheduler-task-kubeflow</module>
     </modules>
 
     <dependencyManagement>
diff --git a/dolphinscheduler-ui/public/images/task-icons/kubeflow.png b/dolphinscheduler-ui/public/images/task-icons/kubeflow.png
new file mode 100644
index 0000000000..72121f114b
Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/kubeflow.png differ
diff --git a/dolphinscheduler-ui/public/images/task-icons/kubeflow_hover.png b/dolphinscheduler-ui/public/images/task-icons/kubeflow_hover.png
new file mode 100644
index 0000000000..4f51d22281
Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/kubeflow_hover.png differ
diff --git a/dolphinscheduler-ui/src/store/project/task-type.ts b/dolphinscheduler-ui/src/store/project/task-type.ts
index b4ee0fd883..e403cbfc68 100644
--- a/dolphinscheduler-ui/src/store/project/task-type.ts
+++ b/dolphinscheduler-ui/src/store/project/task-type.ts
@@ -141,6 +141,10 @@ export const TASK_TYPES_MAP = {
   DATASYNC: {
     alias: 'DATASYNC',
     helperLinkDisable: true
+  },
+  KUBEFLOW: {
+    alias: 'KUBEFLOW',
+    helperLinkDisable: true
   }
 } as {
   [key in TaskType]: {
diff --git a/dolphinscheduler-ui/src/store/project/types.ts b/dolphinscheduler-ui/src/store/project/types.ts
index 19c2015d17..0cc500385d 100644
--- a/dolphinscheduler-ui/src/store/project/types.ts
+++ b/dolphinscheduler-ui/src/store/project/types.ts
@@ -54,6 +54,7 @@ type TaskType =
   | 'HIVECLI'
   | 'DMS'
   | 'DATASYNC'
+  | 'KUBEFLOW'
 
 type ProgramType = 'JAVA' | 'SCALA' | 'PYTHON'
 type DependentResultType = {
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
index 584359b593..d0df64cdba 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
@@ -82,3 +82,4 @@ export { usePytorch } from './use-pytorch'
 export { useHiveCli } from './use-hive-cli'
 export { useDms } from './use-dms'
 export { useDatasync } from './use-datasync'
+export { useKubeflow } from './use-kubeflow'
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-kubeflow.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-kubeflow.ts
new file mode 100644
index 0000000000..e03e524cf3
--- /dev/null
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-kubeflow.ts
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import type { IJsonItem } from '../types'
+import { useCustomParams, useNamespace } from '.'
+
+export function useKubeflow(model: { [field: string]: any }): IJsonItem[] {
+  return [
+    useNamespace(),
+    {
+      type: 'editor',
+      field: 'yamlContent',
+      name: 'yamlContent',
+      props: {
+        language: 'yaml'
+      },
+      validate: {
+        trigger: ['input', 'trigger'],
+        required: true,
+        message: 'requestJson'
+      }
+    },
+    ...useCustomParams({ model, field: 'localParams', isSimple: false })
+  ]
+}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 3066d374af..276b872f4b 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -456,6 +456,11 @@ export function formatParams(data: INodeData): {
     taskParams.cloudWatchLogGroupArn = data.cloudWatchLogGroupArn
   }
 
+  if (data.taskType === 'KUBEFLOW') {
+    taskParams.yamlContent = data.yamlContent
+    taskParams.namespace = data.namespace
+  }
+
   let timeoutNotifyStrategy = ''
   if (data.timeoutNotifyStrategy) {
     if (data.timeoutNotifyStrategy.length === 1) {
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
index 8debfe6e9f..6fa4c47ee0 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
@@ -47,7 +47,9 @@ import { useChunjun } from './use-chunjun'
 import { usePytorch } from './use-pytorch'
 import { useHiveCli } from './use-hive-cli'
 import { useDms } from './use-dms'
-import {useDatasync} from "./use-datasync";
+import { useDatasync } from './use-datasync'
+import { useKubeflow } from './use-kubeflow'
+
 
 export default {
   SHELL: useShell,
@@ -82,5 +84,6 @@ export default {
   PYTORCH: usePytorch,
   HIVECLI: useHiveCli,
   DMS: useDms,
-  DATASYNC: useDatasync
+  DATASYNC: useDatasync,
+  KUBEFLOW: useKubeflow
 }
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-kubeflow.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-kubeflow.ts
new file mode 100644
index 0000000000..be784c0bca
--- /dev/null
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-kubeflow.ts
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive } from 'vue'
+import * as Fields from '../fields/index'
+import type { IJsonItem, INodeData, ITaskData } from '../types'
+
+export function useKubeflow({
+  projectCode,
+  from = 0,
+  readonly,
+  data
+}: {
+  projectCode: number
+  from?: number
+  readonly?: boolean
+  data?: ITaskData
+}) {
+  const model = reactive({
+    name: '',
+    taskType: 'KUBEFLOW',
+    flag: 'YES',
+    description: '',
+    timeoutFlag: false,
+    localParams: [],
+    environmentCode: null,
+    failRetryInterval: 1,
+    failRetryTimes: 0,
+    workerGroup: 'default',
+    delayTime: 0,
+    timeout: 30,
+    timeoutNotifyStrategy: ['WARN']
+  } as INodeData)
+
+  return {
+    json: [
+      Fields.useName(from),
+      ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
+      Fields.useRunFlag(),
+      Fields.useDescription(),
+      Fields.useTaskPriority(),
+      Fields.useWorkerGroup(),
+      Fields.useEnvironmentName(model, !data?.id),
+      ...Fields.useTaskGroup(model, projectCode),
+      ...Fields.useFailed(),
+      Fields.useDelayTime(model),
+      ...Fields.useTimeoutAlarm(model),
+      ...Fields.useKubeflow(model),
+      Fields.usePreTasks()
+    ] as IJsonItem[],
+    model
+  }
+}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index 75a592fb0f..6f07500cc8 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -395,6 +395,7 @@ interface ITaskParams {
   sourceLocationArn?: string
   name?: string
   cloudWatchLogGroupArn?: string
+  yamlContent?: string
 }
 
 interface INodeData
diff --git a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
index e1b4ccbd8b..8de119648f 100644
--- a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
@@ -48,6 +48,7 @@ export type TaskType =
   | 'HIVECLI'
   | 'DMS'
   | 'DATASYNC'
+  | 'KUBEFLOW'
 
 export type TaskExecuteType = 'STREAM' | 'BATCH'
 
@@ -170,6 +171,10 @@ export const TASK_TYPES_MAP = {
   DATASYNC: {
     alias: 'DATASYNC',
     helperLinkDisable: true
+  },
+  KUBEFLOW: {
+    alias: 'KUBEFLOW',
+    helperLinkDisable: true
   }
 } as {
   [key in TaskType]: {
diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
index b864c03b2c..09cd094732 100644
--- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
+++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
@@ -198,6 +198,9 @@ $bgLight: #ffffff;
     &.icon-datasync {
       background-image: url('/images/task-icons/datasync_hover.png');
     }
+    &.icon-kubeflow {
+      background-image: url('/images/task-icons/kubeflow_hover.png');
+    }
   }
 
   &:hover {
@@ -299,6 +302,9 @@ $bgLight: #ffffff;
       &.icon-datasync {
         background-image: url('/images/task-icons/datasync.png');
       }
+      &.icon-kubeflow {
+        background-image: url('/images/task-icons/kubeflow.png');
+      }
     }
   }