You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ke...@apache.org on 2022/06/21 05:13:50 UTC

[dolphinscheduler] branch dev updated: [Feature][Task Plugin] Add DVC task plugin for MLops scenario (#10372) (#10407)

This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 36e20cdfc8 [Feature][Task Plugin] Add DVC task plugin for MLops scenario (#10372) (#10407)
36e20cdfc8 is described below

commit 36e20cdfc8d668b56ab60e273a7327ca41c57a04
Author: JieguangZhou <ji...@163.com>
AuthorDate: Tue Jun 21 13:13:46 2022 +0800

    [Feature][Task Plugin] Add DVC task plugin for MLops scenario (#10372) (#10407)
---
 docs/docs/en/guide/task/dvc.md                     | 125 +++++++++++++++
 docs/docs/zh/guide/task/dvc.md                     | 110 +++++++++++++
 docs/img/tasks/demo/dvc_download.png               | Bin 0 -> 24920 bytes
 docs/img/tasks/demo/dvc_env_config.png             | Bin 0 -> 122301 bytes
 docs/img/tasks/demo/dvc_env_name.png               | Bin 0 -> 49877 bytes
 docs/img/tasks/demo/dvc_init.png                   | Bin 0 -> 16107 bytes
 docs/img/tasks/demo/dvc_upload.png                 | Bin 0 -> 28869 bytes
 docs/img/tasks/icons/dvc.png                       | Bin 0 -> 7649 bytes
 .../dolphinscheduler-task-all/pom.xml              |   6 +
 .../dolphinscheduler-task-dvc/pom.xml              |  46 ++++++
 .../plugin/task/dvc/DvcConstants.java              |  56 +++++++
 .../plugin/task/dvc/DvcParameters.java             | 105 +++++++++++++
 .../dolphinscheduler/plugin/task/dvc/DvcTask.java  | 163 ++++++++++++++++++++
 .../plugin/task/dvc/DvcTaskChannel.java            |  51 ++++++
 .../plugin/task/dvc/DvcTaskChannelFactory.java     |  58 +++++++
 .../plugin/task/dvc/TaskTypeEnum.java              |  30 ++++
 .../plugin/task/dvc/DvcTaskTest.java               | 153 ++++++++++++++++++
 dolphinscheduler-task-plugin/pom.xml               |   3 +-
 .../public/images/task-icons/dvc.png               | Bin 0 -> 7649 bytes
 .../public/images/task-icons/dvc_hover.png         | Bin 0 -> 17007 bytes
 dolphinscheduler-ui/src/locales/en_US/project.ts   |  10 ++
 dolphinscheduler-ui/src/locales/zh_CN/project.ts   |  10 ++
 .../projects/task/components/node/fields/index.ts  |   1 +
 .../task/components/node/fields/use-dvc.ts         | 171 +++++++++++++++++++++
 .../projects/task/components/node/format-data.ts   |  11 ++
 .../projects/task/components/node/tasks/index.ts   |   4 +-
 .../projects/task/components/node/tasks/use-dvc.ts |  82 ++++++++++
 .../views/projects/task/components/node/types.ts   |   7 +
 .../src/views/projects/task/constants/task-type.ts |   5 +
 .../workflow/components/dag/dag.module.scss        |   6 +
 30 files changed, 1211 insertions(+), 2 deletions(-)

diff --git a/docs/docs/en/guide/task/dvc.md b/docs/docs/en/guide/task/dvc.md
new file mode 100644
index 0000000000..102295eb40
--- /dev/null
+++ b/docs/docs/en/guide/task/dvc.md
@@ -0,0 +1,125 @@
+# DVC Node
+
+## Overview
+
+[DVC (Data Version Control)](https://dvc.org) is an excellent open-source  version control system for machine learning projects.
+
+The DVC plugin is used to use the data version management function of DVC on DolphinScheduler, helping users to carry out data version management easily.
+
+The plugin provides the following three functions:
+
+- Init DVC: Initialize the Git repository as a DVC repository and bind the address where the data is stored to store the actual data.
+- Upload: Add or update specific data to the repository and record the version tag.
+- Download: Download a specific version of data from the repository.
+
+## Create Task
+
+- Click `Project -> Management-Project -> Name-Workflow Definition`, and click the "Create Workflow" button to enter the
+  DAG editing page.
+- Drag from the toolbar <img src="../../../../img/tasks/icons/dvc.png" width="15"/> task node to canvas.
+
+## Task Example
+
+First, introduce some general parameters of DolphinScheduler:
+
+- **Node name**: The node name in a workflow definition is unique.
+- **Run flag**: Identifies whether this node schedules normally, if it does not need to execute, select
+  the `prohibition execution`.
+- **Descriptive information**: Describe the function of the node.
+- **Task priority**: When the number of worker threads is insufficient, execute in the order of priority from high
+  to low, and tasks with the same priority will execute in a first-in first-out order.
+- **Worker grouping**: Assign tasks to the machines of the worker group to execute. If `Default` is selected,
+  randomly select a worker machine for execution.
+- **Environment Name**: Configure the environment name in which run the script.
+- **Times of failed retry attempts**: The number of times the task failed to resubmit.
+- **Failed retry interval**: The time interval (unit minute) for resubmitting the task after a failed task.
+- **Delayed execution time**: The time (unit minute) that a task delays in execution.
+- **Timeout alarm**: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm
+  email will send and the task execution will fail.
+- **Predecessor task**: Selecting a predecessor task for the current task, will set the selected predecessor task as
+  upstream of the current task.
+
+Here are some specific parameters for the DVC plugin:
+
+- **DVC Task Type** :Upload, Download or Init DVC。
+- **DVC Repository** :The DVC repository address associated with the task execution.
+
+### Init DVC
+
+Initialize the Git repository as a DVC repository and add a new data remote to save data.
+
+After the project is initialized, it is still a Git repository, but with DVC features added.
+
+The data is not actually stored in a Git repository, but somewhere else, and DVC keeps track of the version and address of the data and handles this relationship.
+
+![dvc_init](../../../../img/tasks/demo/dvc_init.png)
+
+**Task Parameter**
+
+- **Remote Store Url** :The actual data is stored at the address. You can learn about the supported storage types from the [DVC supported storage types](https://dvc.org/doc/command-reference/remote/add#supported-storage-types) .
+
+The example above shows that: 
+Initialize repository `git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git` as a DVC project and bind the remote storage address to `~/dvc`
+
+### Upload
+
+Used to upload and update data and record version numbers.
+
+![dvc_upload](../../../../img/tasks/demo/dvc_upload.png)
+
+**Task Parameter**
+
+- **Data Path in DVC Repository** :The data will be uploaded to this path in the repository.
+- **Data Path In Worker** :Data path to be uploaded.
+- **Version** :After the data is uploaded, the version tag for the data will be saved in `git tag`.
+- **Version Message** :Version Message. 
+
+The example above shows that:
+
+Upload data `/home/data/iris` to the root directory of repository `git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git`. The file or folder of data is named `iris`.
+
+Then run `git tag "iris_1.0" -m "init iris data"`. Record the version tag `iris_1.0` and the version message `inir iris data`.
+
+### Download
+
+Used to download data for a specific version.
+
+![dvc_download](../../../../img/tasks/demo/dvc_download.png)
+
+**Task Parameter**
+
+- **Data Path in DVC Repository** :The path to the data to download in the DVC repository.
+- **Data Path In Worker** :Path for saving data after the file is downloaded to the local.
+- **Version** :The version of the data to download.
+
+The example above shows that:
+
+Download the data for iris data at version `iris_1.0` in repository `git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git` to the `~/dvc_test/iris`
+
+## Environment to prepare
+
+### Install DVC
+
+Make sure you have installed DVC, if not, you can run `pip install dvc` command to install it.
+
+Get the 'dvc' path and configure the environment variables.
+
+The conda environment is used as an example:
+
+Install python PIP on Conda and configure conda's environment variables so that the component can correctly find the 'DVC' command
+
+```shell
+which dvc
+# >> ~/anaconda3/bin/dvc
+```
+
+You need to enter the admin account to configure a conda environment variable(Please
+install [anaconda](https://docs.continuum.io/anaconda/install/)
+or [miniconda](https://docs.conda.io/en/latest/miniconda.html#installing ) in advance).
+
+![dvc_env_config](../../../../img/tasks/demo/dvc_env_config.png)
+
+Note During the configuration task, select the conda environment created above. Otherwise, the program cannot find the
+Conda environment.
+
+![dvc_env_name](../../../../img/tasks/demo/dvc_env_name.png)
\ No newline at end of file
diff --git a/docs/docs/zh/guide/task/dvc.md b/docs/docs/zh/guide/task/dvc.md
new file mode 100644
index 0000000000..3f9ff91907
--- /dev/null
+++ b/docs/docs/zh/guide/task/dvc.md
@@ -0,0 +1,110 @@
+# DVC节点
+
+## 综述
+
+[DVC(Data Version Control)](https://dvc.org) 是一个MLops领域一个优秀的开机器学习版本管理系统。
+
+DVC 组件用于在DS上使用DVC的数据版本管理功能,帮助用户简易地进行数据的版本管理。组件提供如下三个功能:
+
+- Init DVC: 将git仓库初始化为DVC仓库,并绑定存储数据的地址用于存储实际的数据。
+- Upload: 将特定数据添加或者更新到仓库中,并记录版本号。
+- Download: 从仓库中下载特定版本的数据。
+
+## 创建任务
+
+- 点击项目管理-项目名称-工作流定义,点击“创建工作流”按钮,进入 DAG 编辑页面;
+- 拖动工具栏的 <img src="../../../../img/tasks/icons/dvc.png" width="15"/> 任务节点到画板中。
+
+## 任务样例
+
+首先介绍一些DS通用参数
+
+- **节点名称** :设置任务的名称。一个工作流定义中的节点名称是唯一的。
+- **运行标志** :标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。
+- **描述** :描述该节点的功能。
+- **任务优先级** :worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。
+- **Worker 分组** :任务分配给 worker 组的机器执行,选择 Default,会随机选择一台 worker 机执行。
+- **环境名称** :配置运行脚本的环境。
+- **失败重试次数** :任务失败重新提交的次数。
+- **失败重试间隔** :任务失败重新提交任务的时间间隔,以分钟为单位。
+- **延迟执行时间** :任务延迟执行的时间,以分钟为单位。
+- **超时告警** :勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败。
+- **前置任务** :选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
+
+以下是一些DVC 组件的常用参数
+
+- **DVC任务类型** :可以选择 Upload、Download、Init DVC。
+- **DVC仓库** :任务执行时关联的仓库地址。
+
+### Init DVC
+
+
+将git仓库初始化为DVC仓库, 并绑定数据储存的地方。
+
+项目初始化后,仍然为git仓库,不过添加了DVC的特性。
+
+实际上数据并不保存在git仓库,而是存储在另外的地方,DVC会跟踪数据的版本和地址,并处理好这个关系。
+
+![dvc_init](../../../../img/tasks/demo/dvc_init.png)
+
+**任务参数**
+
+- **数据存储地址**
+  :实际的数据保存的地址,支持的类型可见 [DVC supported storage types](https://dvc.org/doc/command-reference/remote/add#supported-storage-types)
+  。
+
+如上述例子表示: 将仓库 `git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git` 初始化为DVC项目,并绑定远程储存地址为 `~/dvc`
+
+### Upload
+
+用于上传和更新数据,并记录版本号。
+
+![dvc_upload](../../../../img/tasks/demo/dvc_upload.png)
+
+**任务参数**
+
+- **DVC仓库中的数据路径** :上传的数据保存到仓库的地址。
+- **Worker中数据路径** :需要上传的数据的地址。
+- **数据版本** :上传数据后,为该版本数据打上的版本号,会保存到 git tag 里面。
+- **数据版本信息** :本次上传需要备注的信息。
+
+如上述例子表示: 将数据 `/home/data/iris` 上传到仓库 `git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git`
+的根目录下,数据的文件/文件夹名字为`iris`。 然后执行 `git tag "iris_1.0" -m "init iris data"`。 记录版本号 `iris_1.0`和 版本信息 'inir iris data'
+
+### Download
+
+用于下载特定版本的数据。
+
+![dvc_download](../../../../img/tasks/demo/dvc_download.png)
+
+**任务参数**
+
+- **DVC仓库中的数据路径** :需要下载数据在仓库中的路径。
+- **Worker中数据路径** :数据下载到本地后的保存地址。
+- **数据版本** :需要下载的数据的版本。
+
+如上述例子表示: 将仓库 `git@github.com:xxxx/dvc-data-repository-example.git` 版本为 `iris_1.0` 的 iris 的数据下载到 `~/dvc_test/iris`
+
+## 环境准备
+
+### dvc 安装
+
+确保你已经安装DVC可以使用`pip install dvc`进行安装。
+
+获取dvc地址, 并配置环境变量
+
+下面以 conda 上的 python pip 安装为例子,配置 conda 的环境变量,使得组件能正确找到`dvc`命令
+
+```shell
+which dvc
+# >> ~/anaconda3/bin/dvc
+```
+
+你需要进入admin账户配置一个conda环境变量(请提前[安装anaconda](https://docs.continuum.io/anaconda/install/)
+或者[安装miniconda](https://docs.conda.io/en/latest/miniconda.html#installing) )。
+
+![dvc_env_config](../../../../img/tasks/demo/dvc_env_config.png)
+
+后续注意配置任务时,环境选择上面创建的conda环境,否则程序会找不到conda环境。
+
+![dvc_env_name](../../../../img/tasks/demo/dvc_env_name.png)
diff --git a/docs/img/tasks/demo/dvc_download.png b/docs/img/tasks/demo/dvc_download.png
new file mode 100644
index 0000000000..b6ed9edd61
Binary files /dev/null and b/docs/img/tasks/demo/dvc_download.png differ
diff --git a/docs/img/tasks/demo/dvc_env_config.png b/docs/img/tasks/demo/dvc_env_config.png
new file mode 100644
index 0000000000..3bbfe44cec
Binary files /dev/null and b/docs/img/tasks/demo/dvc_env_config.png differ
diff --git a/docs/img/tasks/demo/dvc_env_name.png b/docs/img/tasks/demo/dvc_env_name.png
new file mode 100644
index 0000000000..17973ab621
Binary files /dev/null and b/docs/img/tasks/demo/dvc_env_name.png differ
diff --git a/docs/img/tasks/demo/dvc_init.png b/docs/img/tasks/demo/dvc_init.png
new file mode 100644
index 0000000000..7da5ada16e
Binary files /dev/null and b/docs/img/tasks/demo/dvc_init.png differ
diff --git a/docs/img/tasks/demo/dvc_upload.png b/docs/img/tasks/demo/dvc_upload.png
new file mode 100644
index 0000000000..274c1a813a
Binary files /dev/null and b/docs/img/tasks/demo/dvc_upload.png differ
diff --git a/docs/img/tasks/icons/dvc.png b/docs/img/tasks/icons/dvc.png
new file mode 100644
index 0000000000..404640c862
Binary files /dev/null and b/docs/img/tasks/icons/dvc.png differ
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
index 2422a4628e..7a9d8d91ae 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
@@ -171,6 +171,12 @@
             <artifactId>dolphinscheduler-task-openmldb</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-task-dvc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/pom.xml
new file mode 100644
index 0000000000..a0435647ca
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>dolphinscheduler-task-plugin</artifactId>
+        <groupId>org.apache.dolphinscheduler</groupId>
+        <version>dev-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>dolphinscheduler-task-dvc</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-spi</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-task-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-collections4</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcConstants.java
new file mode 100644
index 0000000000..b15d6d450c
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcConstants.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dvc;
+
+public class DvcConstants {
+    private DvcConstants() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    public static final String CHECK_AND_SET_DVC_REPO = "which dvc || { echo \"dvc does not exist\"; exit 1; }; DVC_REPO=%s";
+
+    public static final String SET_DATA_PATH = "DVC_DATA_PATH=%s";
+
+    public static final String SET_DATA_LOCATION = "DVC_DATA_LOCATION=%s";
+
+    public static final String SET_VERSION = "DVC_VERSION=%s";
+
+    public static final String SET_MESSAGE = "DVC_MESSAGE=\"%s\"";
+
+    public static final String GIT_CLONE_DVC_REPO  = "git clone $DVC_REPO dvc-repository; cd dvc-repository; pwd";
+
+    public static final String DVC_AUTOSTAGE  = "dvc config core.autostage true --local || exit 1";
+
+    public static final String DVC_ADD_DATA  = "dvc add $DVC_DATA_PATH -v -o $DVC_DATA_LOCATION --to-remote || exit 1";
+
+    public static final String GIT_UPDATE_FOR_UPDATE_DATA  = "git commit -am \"$DVC_MESSAGE\"\n" +
+            "git tag \"$DVC_VERSION\" -m \"$DVC_MESSAGE\"\n" +
+            "git push --all\n" +
+            "git push --tags";
+
+    public static final String DVC_DOWNLOAD  = "dvc get $DVC_REPO $DVC_DATA_LOCATION -o $DVC_DATA_PATH -v --rev $DVC_VERSION";
+
+
+    public static final String DVC_INIT = "dvc init || exit 1";
+
+    public static final String DVC_ADD_REMOTE = "dvc remote add origin %s -d";
+
+    public static final String GIT_UPDATE_FOR_INIT_DVC = "git commit -am \"init dvc project and add remote\"; git push";
+
+}
+
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcParameters.java
new file mode 100644
index 0000000000..97c404cb29
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcParameters.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dvc;
+
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+
+public class DvcParameters extends AbstractParameters {
+
+    /**
+     * common parameters
+     */
+
+    private TaskTypeEnum dvcTaskType;
+
+    private String dvcRepository;
+
+    private String dvcVersion;
+
+    private String dvcDataLocation;
+
+    private String dvcMessage;
+
+    private String dvcLoadSaveDataPath;
+
+    private String dvcStoreUrl;
+
+    public void setDvcTaskType(TaskTypeEnum dvcTaskType) {
+        this.dvcTaskType = dvcTaskType;
+    }
+
+    public TaskTypeEnum getDvcTaskType() {
+        return dvcTaskType;
+    }
+
+    public void setDvcRepository(String dvcRepository) {
+        this.dvcRepository = dvcRepository;
+    }
+
+    public String getDvcRepository() {
+        return dvcRepository;
+    }
+
+    public void setDvcVersion(String dvcVersion) {
+        this.dvcVersion = dvcVersion;
+    }
+
+    public String getDvcVersion() {
+        return dvcVersion;
+    }
+
+    public void setDvcDataLocation(String dvcDataLocation) {
+        this.dvcDataLocation = dvcDataLocation;
+    }
+
+    public String getDvcDataLocation() {
+        return dvcDataLocation;
+    }
+
+    public void setDvcMessage(String dvcMessage) {
+        this.dvcMessage = dvcMessage;
+    }
+
+    public String getDvcMessage() {
+        return dvcMessage;
+    }
+
+    public void setDvcLoadSaveDataPath(String dvcLoadSaveDataPath) {
+        this.dvcLoadSaveDataPath = dvcLoadSaveDataPath;
+    }
+
+    public String getDvcLoadSaveDataPath() {
+        return dvcLoadSaveDataPath;
+    }
+
+    public void setDvcStoreUrl(String dvcStoreUrl) {
+        this.dvcStoreUrl = dvcStoreUrl;
+    }
+
+    public String getDvcStoreUrl() {
+        return dvcStoreUrl;
+    }
+
+    @Override
+    public boolean checkParameters() {
+        Boolean checkResult = true;
+        return checkResult;
+    }
+
+}
+
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
new file mode 100644
index 0000000000..24bd015dc9
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dvc;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * shell task
+ */
+public class DvcTask extends AbstractTaskExecutor {
+
+    /**
+     * dvc parameters
+     */
+    private DvcParameters parameters;
+
+    /**
+     * shell command executor
+     */
+    private ShellCommandExecutor shellCommandExecutor;
+
+    /**
+     * taskExecutionContext
+     */
+    private TaskExecutionContext taskExecutionContext;
+
+    /**
+     * constructor
+     *
+     * @param taskExecutionContext taskExecutionContext
+     */
+    public DvcTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+
+        this.taskExecutionContext = taskExecutionContext;
+        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, logger);
+    }
+
+    @Override
+    public void init() {
+        logger.info("dvc task params {}", taskExecutionContext.getTaskParams());
+
+        parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DvcParameters.class);
+
+        if (!parameters.checkParameters()) {
+            throw new RuntimeException("dvc task params is not valid");
+        }
+    }
+
+    @Override
+    public void handle() throws Exception {
+        try {
+            // construct process
+            String command = buildCommand();
+            TaskResponse commandExecuteResult = shellCommandExecutor.run(command);
+            setExitStatusCode(commandExecuteResult.getExitStatusCode());
+            setAppIds(commandExecuteResult.getAppIds());
+            setProcessId(commandExecuteResult.getProcessId());
+            parameters.dealOutParam(shellCommandExecutor.getVarPool());
+        } catch (Exception e) {
+            logger.error("dvc task error", e);
+            setExitStatusCode(EXIT_CODE_FAILURE);
+            throw e;
+        }
+    }
+
+    @Override
+    public void cancelApplication(boolean cancelApplication) throws Exception {
+        // cancel process
+        shellCommandExecutor.cancelApplication();
+    }
+
+    public String buildCommand() {
+        String command = "";
+        TaskTypeEnum taskType = parameters.getDvcTaskType();
+        if (taskType == TaskTypeEnum.UPLOAD) {
+            command = buildUploadCommond();
+        }else if (taskType == TaskTypeEnum.DOWNLOAD){
+            command = buildDownCommond();
+        }else if (taskType == TaskTypeEnum.INIT){
+            command = buildInitDvcCommond();
+        }
+        logger.info("Run DVC task with command: \n{}", command);
+        return command;
+    }
+
+    private String buildUploadCommond() {
+        List<String> args = new ArrayList<>();
+        args.add(String.format(DvcConstants.CHECK_AND_SET_DVC_REPO, parameters.getDvcRepository()));
+        args.add(String.format(DvcConstants.SET_DATA_PATH, parameters.getDvcLoadSaveDataPath()));
+        args.add(String.format(DvcConstants.SET_DATA_LOCATION, parameters.getDvcDataLocation()));
+        args.add(String.format(DvcConstants.SET_VERSION, parameters.getDvcVersion()));
+        args.add(String.format(DvcConstants.SET_MESSAGE, parameters.getDvcMessage()));
+        args.add(DvcConstants.GIT_CLONE_DVC_REPO);
+        args.add(DvcConstants.DVC_AUTOSTAGE);
+        args.add(DvcConstants.DVC_ADD_DATA);
+        args.add(DvcConstants.GIT_UPDATE_FOR_UPDATE_DATA);
+
+        String command = String.join("\n", args);
+        return command;
+
+    }
+
+    private String buildDownCommond() {
+        List<String> args = new ArrayList<>();
+        args.add(String.format(DvcConstants.CHECK_AND_SET_DVC_REPO, parameters.getDvcRepository()));
+        args.add(String.format(DvcConstants.SET_DATA_PATH, parameters.getDvcLoadSaveDataPath()));
+        args.add(String.format(DvcConstants.SET_DATA_LOCATION, parameters.getDvcDataLocation()));
+        args.add(String.format(DvcConstants.SET_VERSION, parameters.getDvcVersion()));
+        args.add(DvcConstants.DVC_DOWNLOAD);
+
+        String command = String.join("\n", args);
+        return command;
+
+    }
+
+    private String buildInitDvcCommond() {
+        List<String> args = new ArrayList<>();
+        args.add(String.format(DvcConstants.CHECK_AND_SET_DVC_REPO, parameters.getDvcRepository()));
+        args.add(DvcConstants.GIT_CLONE_DVC_REPO);
+        args.add(DvcConstants.DVC_INIT);
+        args.add(String.format(DvcConstants.DVC_ADD_REMOTE, parameters.getDvcStoreUrl()));
+        args.add(DvcConstants.GIT_UPDATE_FOR_INIT_DVC);
+
+        String command = String.join("\n", args);
+        return command;
+
+    }
+
+
+    @Override
+    public AbstractParameters getParameters() {
+        return parameters;
+    }
+
+
+}
+
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskChannel.java
new file mode 100644
index 0000000000..adccccede7
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskChannel.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dvc;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+
+public class DvcTaskChannel implements TaskChannel {
+
+    @Override
+    public void cancelApplication(boolean status) {
+
+    }
+
+    @Override
+    public DvcTask createTask(TaskExecutionContext taskRequest) {
+        return new DvcTask(taskRequest);
+    }
+
+    @Override
+    public AbstractParameters parseParameters(ParametersNode parametersNode) {
+        return JSONUtils.parseObject(parametersNode.getTaskParams(), DvcParameters.class);
+    }
+
+    @Override
+    public ResourceParametersHelper getResources(String parameters) {
+        return null;
+    }
+
+}
+
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskChannelFactory.java
new file mode 100644
index 0000000000..f028d4e0d6
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskChannelFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dvc;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
+import org.apache.dolphinscheduler.spi.params.base.ParamsOptions;
+import org.apache.dolphinscheduler.spi.params.base.PluginParams;
+import org.apache.dolphinscheduler.spi.params.base.Validate;
+import org.apache.dolphinscheduler.spi.params.input.InputParam;
+import org.apache.dolphinscheduler.spi.params.radio.RadioParam;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(TaskChannelFactory.class)
+public class DvcTaskChannelFactory implements TaskChannelFactory {
+    @Override
+    public TaskChannel create() {
+        return new DvcTaskChannel();
+    }
+
+    @Override
+    public String getName() {
+        return "DVC";
+    }
+
+    @Override
+    public List<PluginParams> getParams() {
+        List<PluginParams> paramsList = new ArrayList<>();
+
+        InputParam nodeName = InputParam.newBuilder("name", "$t('Node name')").addValidate(Validate.newBuilder().setRequired(true).build()).build();
+
+        RadioParam runFlag = RadioParam.newBuilder("runFlag", "RUN_FLAG").addParamsOptions(new ParamsOptions("NORMAL", "NORMAL", false)).addParamsOptions(new ParamsOptions("FORBIDDEN", "FORBIDDEN", false)).build();
+
+        paramsList.add(nodeName);
+        paramsList.add(runFlag);
+        return paramsList;
+    }
+}
+
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/TaskTypeEnum.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/TaskTypeEnum.java
new file mode 100644
index 0000000000..4cdce3eb23
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/TaskTypeEnum.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dvc;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public enum TaskTypeEnum {
+
+    @JsonProperty("Upload")
+    UPLOAD,
+    @JsonProperty("Download")
+    DOWNLOAD,
+    @JsonProperty("Init DVC")
+    INIT
+}
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java
new file mode 100644
index 0000000000..2b42c7f77d
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dvc;
+
+import java.util.Date;
+import java.util.UUID;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+        JSONUtils.class,
+        PropertyUtils.class,
+})
+@PowerMockIgnore({"javax.*"})
+@SuppressStaticInitializationFor("org.apache.dolphinscheduler.spi.utils.PropertyUtils")
+public class DvcTaskTest {
+
+    @Before
+    public void before() throws Exception {
+        PowerMockito.mockStatic(PropertyUtils.class);
+    }
+
+    public TaskExecutionContext createContext(DvcParameters dvcParameters) {
+        String parameters = JSONUtils.toJsonString(dvcParameters);
+        TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
+        Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
+        Mockito.when(taskExecutionContext.getTaskLogName()).thenReturn("DvcTest");
+        Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp/dolphinscheduler_dvc_test");
+        Mockito.when(taskExecutionContext.getTaskAppId()).thenReturn(UUID.randomUUID().toString());
+        Mockito.when(taskExecutionContext.getStartTime()).thenReturn(new Date());
+        Mockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000);
+        Mockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dolphinscheduler_dvc_test/log");
+        Mockito.when(taskExecutionContext.getEnvironmentConfig()).thenReturn("export PATH=$HOME/anaconda3/bin:$PATH");
+
+        String userName = System.getenv().get("USER");
+        Mockito.when(taskExecutionContext.getTenantCode()).thenReturn(userName);
+
+        TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
+        return taskExecutionContext;
+    }
+
+    private DvcTask initTask(DvcParameters parameters) {
+        TaskExecutionContext taskExecutionContext = createContext(parameters);
+        DvcTask dvcTask = new DvcTask(taskExecutionContext);
+        dvcTask.init();
+        dvcTask.getParameters().setVarPool(taskExecutionContext.getVarPool());
+        return dvcTask;
+
+    }
+
+    @Test
+    public void testDvcUpload() throws Exception{
+        DvcTask dvcTask = initTask(createUploadParameters());
+        Assert.assertEquals(dvcTask.buildCommand(),
+                "which dvc || { echo \"dvc does not exist\"; exit 1; }; DVC_REPO=git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example\n" +
+                "DVC_DATA_PATH=/home/<YOUR-NAME-OR-ORG>/test\n" +
+                "DVC_DATA_LOCATION=test\n" +
+                "DVC_VERSION=iris_v2.3.1\n" +
+                "DVC_MESSAGE=\"add test iris data\"\n" +
+                "git clone $DVC_REPO dvc-repository; cd dvc-repository; pwd\n" +
+                "dvc config core.autostage true --local || exit 1\n" +
+                "dvc add $DVC_DATA_PATH -v -o $DVC_DATA_LOCATION --to-remote || exit 1\n" +
+                "git commit -am \"$DVC_MESSAGE\"\n" +
+                "git tag \"$DVC_VERSION\" -m \"$DVC_MESSAGE\"\n" +
+                "git push --all\n" +
+                "git push --tags");
+
+    }
+
+    @Test
+    public void testDvcDownload() throws Exception{
+        DvcTask dvcTask = initTask(createDownloadParameters());
+        Assert.assertEquals(dvcTask.buildCommand(),
+                "which dvc || { echo \"dvc does not exist\"; exit 1; }; DVC_REPO=git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example\n" +
+                "DVC_DATA_PATH=data\n" +
+                "DVC_DATA_LOCATION=iris\n" +
+                "DVC_VERSION=iris_v2.3.1\n" +
+                "dvc get $DVC_REPO $DVC_DATA_LOCATION -o $DVC_DATA_PATH -v --rev $DVC_VERSION");
+    }
+
+    @Test
+    public void testInitDvc() throws Exception{
+        DvcTask dvcTask = initTask(createInitDvcParameters());
+        Assert.assertEquals(dvcTask.buildCommand(),
+                "which dvc || { echo \"dvc does not exist\"; exit 1; }; DVC_REPO=git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example\n" +
+                "git clone $DVC_REPO dvc-repository; cd dvc-repository; pwd\n" +
+                "dvc init || exit 1\n" +
+                "dvc remote add origin ~/.dvc_test -d\n" +
+                "git commit -am \"init dvc project and add remote\"; git push");
+    }
+
+    private DvcParameters createUploadParameters() {
+        DvcParameters parameters = new DvcParameters();
+        parameters.setDvcTaskType(TaskTypeEnum.UPLOAD);
+        parameters.setDvcRepository("git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example");
+        parameters.setDvcLoadSaveDataPath("/home/<YOUR-NAME-OR-ORG>/test");
+        parameters.setDvcDataLocation("test");
+        parameters.setDvcVersion("iris_v2.3.1");
+        parameters.setDvcMessage("add test iris data");
+        return parameters;
+    }
+
+    private DvcParameters createDownloadParameters() {
+        DvcParameters parameters = new DvcParameters();
+        parameters.setDvcTaskType(TaskTypeEnum.DOWNLOAD);
+        parameters.setDvcRepository("git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example");
+        parameters.setDvcLoadSaveDataPath("data");
+        parameters.setDvcDataLocation("iris");
+        parameters.setDvcVersion("iris_v2.3.1");
+        return parameters;
+    }
+
+    private DvcParameters createInitDvcParameters() {
+        DvcParameters parameters = new DvcParameters();
+        parameters.setDvcTaskType(TaskTypeEnum.INIT);
+        parameters.setDvcRepository("git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example");
+        parameters.setDvcStoreUrl("~/.dvc_test");
+        return parameters;
+    }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/pom.xml b/dolphinscheduler-task-plugin/pom.xml
index 43f0209e0f..f40e882d47 100644
--- a/dolphinscheduler-task-plugin/pom.xml
+++ b/dolphinscheduler-task-plugin/pom.xml
@@ -55,5 +55,6 @@
         <module>dolphinscheduler-task-jupyter</module>
         <module>dolphinscheduler-task-mlflow</module>
         <module>dolphinscheduler-task-openmldb</module>
+        <module>dolphinscheduler-task-dvc</module>
     </modules>
-</project>
+</project>
\ No newline at end of file
diff --git a/dolphinscheduler-ui/public/images/task-icons/dvc.png b/dolphinscheduler-ui/public/images/task-icons/dvc.png
new file mode 100644
index 0000000000..404640c862
Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/dvc.png differ
diff --git a/dolphinscheduler-ui/public/images/task-icons/dvc_hover.png b/dolphinscheduler-ui/public/images/task-icons/dvc_hover.png
new file mode 100644
index 0000000000..d114743521
Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/dvc_hover.png differ
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts
index df289e5da2..93e643690f 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -676,6 +676,16 @@ export default {
     openmldb_execute_mode_tips: 'Please select the execute mode',
     openmldb_execute_mode_offline: 'offline',
     openmldb_execute_mode_online: 'online',
+    dvc_task_type: 'DVC Task Type',
+    dvc_repository: 'DVC Repository',
+    dvc_repository_tips: 'please input the url of dvc repository',
+    dvc_version: 'Version',
+    dvc_version_tips: 'data version, will be mark as git tag',
+    dvc_data_location: 'Data Path in DVC Repository',
+    dvc_message: 'Version Message',
+    dvc_load_save_data_path: 'Data Path In Worker',
+    dvc_store_url: 'Store Url',
+    dvc_empty_tips: 'This parameter cannot be empty',
     send_email: 'Send Email',
     log_display: 'Log display',
     rows_of_result: 'rows of result',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index 51b0982d36..baed96432e 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -664,6 +664,16 @@ export default {
     openmldb_execute_mode_tips: '请选择执行模式',
     openmldb_execute_mode_offline: '离线',
     openmldb_execute_mode_online: '在线',
+    dvc_task_type: 'DVC任务类型',
+    dvc_repository: 'DVC仓库',
+    dvc_repository_tips: '请输入DVC仓库地址',
+    dvc_version: '数据版本',
+    dvc_version_tips: '数据版本标识,会以git tag的形式标记',
+    dvc_data_location: 'DVC仓库中的数据路径',
+    dvc_message: '提交信息',
+    dvc_load_save_data_path: 'Worker中数据路径',
+    dvc_store_url: '数据存储地址',
+    dvc_empty_tips: '该参数不能为空',
     send_email: '发送邮件',
     log_display: '日志显示',
     rows_of_result: '行查询结果',
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
index 89cd27ba83..971fd0447b 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
@@ -70,3 +70,4 @@ export { useMlflow } from './use-mlflow'
 export { useMlflowProjects } from './use-mlflow-projects'
 export { useMlflowModels } from './use-mlflow-models'
 export { useOpenmldb } from './use-openmldb'
+export { useDvc } from './use-dvc'
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dvc.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dvc.ts
new file mode 100644
index 0000000000..0ceac153c3
--- /dev/null
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dvc.ts
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import { useI18n } from 'vue-i18n'
+import type { IJsonItem } from '../types'
+import { watch, ref } from 'vue'
+
+export const DVC_TASK_TYPE = [
+  {
+    label: 'Upload',
+    value: 'Upload'
+  },
+  {
+    label: 'Download',
+    value: 'Download'
+  },
+  {
+    label: 'Init DVC',
+    value: 'Init DVC'
+  }
+]
+
+export function useDvc(model: { [field: string]: any }): IJsonItem[] {
+  const { t } = useI18n()
+
+  const dvcLoadSaveDataPathSpan = ref(0)
+  const dvcDataLocationSpan = ref(0)
+  const dvcVersionSpan = ref(0)
+  const dvcMessageSpan = ref(0)
+  const dvcStoreUrlSpan = ref(0)
+
+  const setFlag = () => {
+    model.isUpload = model.dvcTaskType === 'Upload'
+    model.isDownload = model.dvcTaskType === 'Download'
+    model.isInit = model.dvcTaskType === 'Init DVC'
+  }
+
+  const resetData = () => {
+    dvcLoadSaveDataPathSpan.value = model.isUpload || model.isDownload ? 24 : 0
+    dvcDataLocationSpan.value = model.isUpload || model.isDownload ? 24 : 0
+    dvcVersionSpan.value = model.isUpload || model.isDownload ? 24 : 0
+    dvcMessageSpan.value = model.isUpload ? 24 : 0
+    dvcStoreUrlSpan.value = model.isInit ? 24 : 0
+  }
+
+  watch(
+    () => [model.dvcTaskType],
+    () => {
+      setFlag()
+      resetData()
+    }
+  )
+  setFlag()
+  resetData()
+
+  return [
+    {
+      type: 'select',
+      field: 'dvcTaskType',
+      name: t('project.node.dvc_task_type'),
+      span: 12,
+      options: DVC_TASK_TYPE
+    },
+    {
+      type: 'input',
+      field: 'dvcRepository',
+      name: t('project.node.dvc_repository'),
+      span: 24,
+      props: {
+        placeholder: t('project.node.dvc_repository_tips')
+      },
+      validate: {
+        trigger: ['input', 'blur'],
+        required: true,
+        message: t('project.node.dvc_empty_tips')
+      }
+    },
+    {
+      type: 'input',
+      field: 'dvcDataLocation',
+      name: t('project.node.dvc_data_location'),
+      props: { placeholder: 'dvcDataLocation' },
+      span: dvcDataLocationSpan,
+      validate: {
+        trigger: ['input', 'blur'],
+        required: true,
+        validator(validate: any, value: string) {
+          if ((model.isUpload || model.isDownload) && !value) {
+            return new Error(t('project.node.dvc_empty_tips'))
+          }
+        }
+      }
+    },
+    {
+      type: 'input',
+      field: 'dvcLoadSaveDataPath',
+      name: t('project.node.dvc_load_save_data_path'),
+      span: dvcLoadSaveDataPathSpan,
+      validate: {
+        trigger: ['input', 'blur'],
+        required: true,
+        validator(validate: any, value: string) {
+          if ((model.isUpload || model.isDownload) && !value) {
+            return new Error(t('project.node.dvc_empty_tips'))
+          }
+        }
+      }
+    },
+    {
+      type: 'input',
+      field: 'dvcVersion',
+      name: t('project.node.dvc_version'),
+      span: dvcVersionSpan,
+      props: {
+        placeholder: t('project.node.dvc_version_tips')
+      },
+      validate: {
+        trigger: ['input', 'blur'],
+        required: true,
+        validator(validate: any, value: string) {
+          if ((model.isUpload || model.isDownload) && !value) {
+            return new Error(t('project.node.dvc_empty_tips'))
+          }
+        }
+      }
+    },
+    {
+      type: 'input',
+      field: 'dvcMessage',
+      name: t('project.node.dvc_message'),
+      span: dvcMessageSpan,
+      validate: {
+        trigger: ['input', 'blur'],
+        required: true,
+        validator(validate: any, value: string) {
+          if (model.isUpload && !value) {
+            return new Error(t('project.node.dvc_empty_tips'))
+          }
+        }
+      }
+    },
+    {
+      type: 'input',
+      field: 'dvcStoreUrl',
+      name: t('project.node.dvc_store_url'),
+      span: dvcStoreUrlSpan,
+      validate: {
+        trigger: ['input', 'blur'],
+        required: true,
+        validator(validate: any, value: string) {
+          if (!model.isInit && value) {
+            return new Error(t('project.node.dvc_empty_tips'))
+          }
+        }
+      }
+    }
+  ]
+}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 5cf480532a..ae5a2efbae 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -359,6 +359,17 @@ export function formatParams(data: INodeData): {
     taskParams.memoryLimit = data.memoryLimit
   }
 
+  if (data.taskType === 'DVC') {
+
+    taskParams.dvcTaskType = data.dvcTaskType
+    taskParams.dvcRepository = data.dvcRepository
+    taskParams.dvcVersion = data.dvcVersion
+    taskParams.dvcDataLocation = data.dvcDataLocation
+    taskParams.dvcMessage = data.dvcMessage
+    taskParams.dvcLoadSaveDataPath = data.dvcLoadSaveDataPath
+    taskParams.dvcStoreUrl = data.dvcStoreUrl
+  }
+
   if (data.taskType === 'OPENMLDB') {
     taskParams.zk = data.zk
     taskParams.zkPath = data.zkPath
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
index a151b70d70..d2a5a6dd01 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
@@ -38,6 +38,7 @@ import { useK8s } from './use-k8s'
 import { useJupyter } from './use-jupyter'
 import { useMlflow } from './use-mlflow'
 import { useOpenmldb } from './use-openmldb'
+import { useDvc } from './use-dvc'
 
 export default {
   SHELL: useShell,
@@ -62,5 +63,6 @@ export default {
   K8S: useK8s,
   JUPYTER: useJupyter,
   MLFLOW: useMlflow,
-  OPENMLDB: useOpenmldb
+  OPENMLDB: useOpenmldb,
+  DVC: useDvc
 }
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dvc.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dvc.ts
new file mode 100644
index 0000000000..5838224e13
--- /dev/null
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dvc.ts
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive } from 'vue'
+import * as Fields from '../fields/index'
+import type { IJsonItem, INodeData, ITaskData } from '../types'
+
+export function useDvc({
+  projectCode,
+  from = 0,
+  readonly,
+  data
+}: {
+  projectCode: number
+  from?: number
+  readonly?: boolean
+  data?: ITaskData
+}) {
+  const model = reactive({
+    name: '',
+    taskType: 'MLFLOW',
+    flag: 'YES',
+    description: '',
+    timeoutFlag: false,
+    localParams: [],
+    environmentCode: null,
+    failRetryInterval: 1,
+    failRetryTimes: 0,
+    workerGroup: 'default',
+    delayTime: 0,
+    timeout: 30,
+    timeoutNotifyStrategy: ['WARN'],
+    dvcTaskType: 'Upload',
+  } as INodeData)
+
+  let extra: IJsonItem[] = []
+  if (from === 1) {
+    extra = [
+      Fields.useTaskType(model, readonly),
+      Fields.useProcessName({
+        model,
+        projectCode,
+        isCreate: !data?.id,
+        from,
+        processName: data?.processName
+      })
+    ]
+  }
+
+  return {
+    json: [
+      Fields.useName(from),
+      ...extra,
+      Fields.useRunFlag(),
+      Fields.useDescription(),
+      Fields.useTaskPriority(),
+      Fields.useWorkerGroup(),
+      Fields.useEnvironmentName(model, !model.id),
+      ...Fields.useTaskGroup(model, projectCode),
+      ...Fields.useFailed(),
+      Fields.useDelayTime(model),
+      ...Fields.useTimeoutAlarm(model),
+      ...Fields.useDvc(model),
+      Fields.usePreTasks()
+    ] as IJsonItem[],
+    model
+  }
+}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index 9535c76d37..7ceca70723 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -341,6 +341,13 @@ interface ITaskParams {
   zk?: string
   zkPath?: string
   executeMode?: string
+  dvcTaskType?: string
+  dvcRepository?: string
+  dvcVersion?: string
+  dvcDataLocation?: string
+  dvcMessage?: string
+  dvcLoadSaveDataPath?: string
+  dvcStoreUrl?: string
 }
 
 interface INodeData
diff --git a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
index 26cd32f544..ebd3f741e1 100644
--- a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
@@ -38,6 +38,7 @@ export type TaskType =
   | 'JUPYTER'
   | 'MLFLOW'
   | 'OPENMLDB'
+  | 'DVC'
 
 export const TASK_TYPES_MAP = {
   SHELL: {
@@ -118,5 +119,9 @@ export const TASK_TYPES_MAP = {
   OPENMLDB: {
     alias: 'OPENMLDB',
     helperLinkDisable: true
+  },
+  DVC: {
+    alias: 'DVC',
+    helperLinkDisable: true
   }
 } as { [key in TaskType]: { alias: string; helperLinkDisable?: boolean } }
diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
index bbe539d813..656e5ae890 100644
--- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
+++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
@@ -170,6 +170,9 @@ $bgLight: #ffffff;
     &.icon-openmldb {
       background-image: url('/images/task-icons/openmldb.png');
     }
+    &.icon-dvc {
+      background-image: url('/images/task-icons/dvc.png');
+    }
   }
 
   &:hover {
@@ -243,6 +246,9 @@ $bgLight: #ffffff;
       &.icon-openmldb {
         background-image: url('/images/task-icons/openmldb_hover.png');
       }
+      &.icon-dvc {
+        background-image: url('/images/task-icons/dvc_hover.png');
+      }
     }
   }
 }