You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/17 08:56:01 UTC

[dolphinscheduler] branch dev updated: [Feature][PyDolphinScheduler] Support DVC task in pyds #11922 (#11941)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new bbc228e93f [Feature][PyDolphinScheduler] Support DVC task in pyds #11922 (#11941)
bbc228e93f is described below

commit bbc228e93f1ddd372bad0c06e50658583286ffd6
Author: JieguangZhou <ji...@163.com>
AuthorDate: Sat Sep 17 16:55:51 2022 +0800

    [Feature][PyDolphinScheduler] Support DVC task in pyds #11922 (#11941)
    
    * add dvc task in pyds
    
    * add BaseDVC class
---
 .../docs/source/tasks/{index.rst => dvc.rst}       |  53 +++----
 .../pydolphinscheduler/docs/source/tasks/index.rst |   1 +
 .../examples/yaml_define/Dvc.yaml                  |  46 ++++++
 .../src/pydolphinscheduler/constants.py            |   1 +
 .../examples/task_dvc_example.py                   |  52 +++++++
 .../src/pydolphinscheduler/tasks/__init__.py       |   4 +
 .../src/pydolphinscheduler/tasks/dvc.py            | 124 +++++++++++++++
 .../pydolphinscheduler/tests/tasks/test_dvc.py     | 173 +++++++++++++++++++++
 .../plugin/task/dvc/DvcConstants.java              |   6 +
 .../plugin/task/dvc/DvcParameters.java             |  91 ++++-------
 .../dolphinscheduler/plugin/task/dvc/DvcTask.java  |   8 +-
 .../plugin/task/dvc/TaskTypeEnum.java              |  30 ----
 .../plugin/task/dvc/DvcTaskTest.java               |   6 +-
 13 files changed, 469 insertions(+), 126 deletions(-)

diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dvc.rst
similarity index 62%
copy from dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
copy to dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dvc.rst
index 5b9c165700..0127a982f3 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dvc.rst
@@ -15,32 +15,27 @@
    specific language governing permissions and limitations
    under the License.
 
-Tasks
-=====
-
-In this section 
-
-.. toctree::
-   :maxdepth: 1
-   
-   func_wrap
-   shell
-   sql
-   python
-   http
-
-   switch
-   condition
-   dependent
-
-   spark
-   flink
-   map_reduce
-   procedure
-
-   datax
-   sub_process
-
-   sagemaker
-   openmldb
-   pytorch
+DVC
+===
+
+A DVC task type's example and dive into information of **PyDolphinScheduler**.
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_dvc_example.py
+   :start-after: [start workflow_declare]
+   :end-before: [end workflow_declare]
+
+Dive Into
+---------
+
+.. automodule:: pydolphinscheduler.tasks.dvc
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Dvc.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
index 5b9c165700..c0cea593d8 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
@@ -44,3 +44,4 @@ In this section
    sagemaker
    openmldb
    pytorch
+   dvc
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dvc.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dvc.yaml
new file mode 100644
index 0000000000..a6ec18c372
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dvc.yaml
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define variable `repository`
+repository: &repository "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git" 
+
+# Define the workflow
+workflow:
+  name: "DVC"
+  release_state: "offline"
+
+# Define the tasks under the process
+tasks:
+  - name: init_dvc 
+    task_type: DVCInit
+    repository: *repository
+    store_url: ~/dvc_data
+
+  - name: upload_data
+    task_type: DVCUpload
+    repository: *repository
+    data_path_in_dvc_repository: "iris"
+    data_path_in_worker: ~/source/iris
+    version: v1
+    message: upload iris data v1
+
+  - name: download_data
+    task_type: DVCDownload
+    repository: *repository
+    data_path_in_dvc_repository: "iris"
+    data_path_in_worker: ~/target/iris
+    version: v1
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index d8d2febfeb..b4a89bb585 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -60,6 +60,7 @@ class TaskType(str):
     SAGEMAKER = "SAGEMAKER"
     OPENMLDB = "OPENMLDB"
     PYTORCH = "PYTORCH"
+    DVC = "DVC"
 
 
 class DefaultTaskCodeNum(str):
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_dvc_example.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_dvc_example.py
new file mode 100644
index 0000000000..2b93cd14b7
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_dvc_example.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [start workflow_declare]
+"""A example workflow for task dvc."""
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks import DVCDownload, DVCInit, DVCUpload
+
+repository = "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git"
+
+with ProcessDefinition(
+    name="task_dvc_example",
+    tenant="tenant_exists",
+) as pd:
+    init_task = DVCInit(name="init_dvc", repository=repository, store_url="~/dvc_data")
+    upload_task = DVCUpload(
+        name="upload_data",
+        repository=repository,
+        data_path_in_dvc_repository="iris",
+        data_path_in_worker="~/source/iris",
+        version="v1",
+        message="upload iris data v1",
+    )
+
+    download_task = DVCDownload(
+        name="download_data",
+        repository=repository,
+        data_path_in_dvc_repository="iris",
+        data_path_in_worker="~/target/iris",
+        version="v1",
+    )
+
+    init_task >> upload_task >> download_task
+
+    pd.run()
+
+# [end workflow_declare]
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
index e5b263c7c2..cefc0024ca 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
@@ -20,6 +20,7 @@
 from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Condition, Or
 from pydolphinscheduler.tasks.datax import CustomDataX, DataX
 from pydolphinscheduler.tasks.dependent import Dependent
+from pydolphinscheduler.tasks.dvc import DVCDownload, DVCInit, DVCUpload
 from pydolphinscheduler.tasks.flink import Flink
 from pydolphinscheduler.tasks.http import Http
 from pydolphinscheduler.tasks.map_reduce import MR
@@ -39,6 +40,9 @@ __all__ = [
     "DataX",
     "CustomDataX",
     "Dependent",
+    "DVCInit",
+    "DVCUpload",
+    "DVCDownload",
     "Flink",
     "Http",
     "MR",
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dvc.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dvc.py
new file mode 100644
index 0000000000..c5b5cd5c91
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dvc.py
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task dvc."""
+from copy import deepcopy
+from typing import Dict
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task
+
+
+class DvcTaskType(str):
+    """Constants for dvc task type."""
+
+    INIT = "Init DVC"
+    DOWNLOAD = "Download"
+    UPLOAD = "Upload"
+
+
+class BaseDVC(Task):
+    """Base class for dvc task."""
+
+    dvc_task_type = None
+
+    _task_custom_attr = {
+        "dvc_task_type",
+        "dvc_repository",
+    }
+
+    _child_task_dvc_attr = set()
+
+    def __init__(self, name: str, repository: str, *args, **kwargs):
+        super().__init__(name, TaskType.DVC, *args, **kwargs)
+        self.dvc_repository = repository
+
+    @property
+    def task_params(self) -> Dict:
+        """Return task params."""
+        self._task_custom_attr = deepcopy(self._task_custom_attr)
+        self._task_custom_attr.update(self._child_task_dvc_attr)
+        return super().task_params
+
+
+class DVCInit(BaseDVC):
+    """Task DVC Init object, declare behavior for DVC Init task to dolphinscheduler."""
+
+    dvc_task_type = DvcTaskType.INIT
+
+    _child_task_dvc_attr = {"dvc_store_url"}
+
+    def __init__(self, name: str, repository: str, store_url: str, *args, **kwargs):
+        super().__init__(name, repository, *args, **kwargs)
+        self.dvc_store_url = store_url
+
+
+class DVCDownload(BaseDVC):
+    """Task DVC Download object, declare behavior for DVC Download task to dolphinscheduler."""
+
+    dvc_task_type = DvcTaskType.DOWNLOAD
+
+    _child_task_dvc_attr = {
+        "dvc_load_save_data_path",
+        "dvc_data_location",
+        "dvc_version",
+    }
+
+    def __init__(
+        self,
+        name: str,
+        repository: str,
+        data_path_in_dvc_repository: str,
+        data_path_in_worker: str,
+        version: str,
+        *args,
+        **kwargs
+    ):
+        super().__init__(name, repository, *args, **kwargs)
+        self.dvc_data_location = data_path_in_dvc_repository
+        self.dvc_load_save_data_path = data_path_in_worker
+        self.dvc_version = version
+
+
+class DVCUpload(BaseDVC):
+    """Task DVC Upload object, declare behavior for DVC Upload task to dolphinscheduler."""
+
+    dvc_task_type = DvcTaskType.UPLOAD
+
+    _child_task_dvc_attr = {
+        "dvc_load_save_data_path",
+        "dvc_data_location",
+        "dvc_version",
+        "dvc_message",
+    }
+
+    def __init__(
+        self,
+        name: str,
+        repository: str,
+        data_path_in_worker: str,
+        data_path_in_dvc_repository: str,
+        version: str,
+        message: str,
+        *args,
+        **kwargs
+    ):
+        super().__init__(name, repository, *args, **kwargs)
+        self.dvc_data_location = data_path_in_dvc_repository
+        self.dvc_load_save_data_path = data_path_in_worker
+        self.dvc_version = version
+        self.dvc_message = message
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dvc.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dvc.py
new file mode 100644
index 0000000000..815d896234
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dvc.py
@@ -0,0 +1,173 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task Dvc."""
+from unittest.mock import patch
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.tasks.dvc import DVCDownload, DVCInit, DvcTaskType, DVCUpload
+
+repository = "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git"
+
+
+def test_dvc_init_get_define():
+    """Test task dvc init function get_define."""
+    name = "test_dvc_init"
+    dvc_store_url = "~/dvc_data"
+
+    code = 123
+    version = 1
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": TaskType.DVC,
+        "taskParams": {
+            "resourceList": [],
+            "localParams": [],
+            "dvcTaskType": DvcTaskType.INIT,
+            "dvcRepository": repository,
+            "dvcStoreUrl": dvc_store_url,
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "environmentCode": None,
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        dvc_init = DVCInit(name, repository, dvc_store_url)
+        assert dvc_init.get_define() == expect
+
+
+def test_dvc_upload_get_define():
+    """Test task dvc upload function get_define."""
+    name = "test_dvc_upload"
+    data_path_in_dvc_repository = "iris"
+    data_path_in_worker = "~/source/iris"
+    version = "v1"
+    message = "upload iris data v1"
+
+    code = 123
+    version = 1
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": TaskType.DVC,
+        "taskParams": {
+            "resourceList": [],
+            "localParams": [],
+            "dvcTaskType": DvcTaskType.UPLOAD,
+            "dvcRepository": repository,
+            "dvcDataLocation": data_path_in_dvc_repository,
+            "dvcLoadSaveDataPath": data_path_in_worker,
+            "dvcVersion": version,
+            "dvcMessage": message,
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "environmentCode": None,
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        dvc_upload = DVCUpload(
+            name,
+            repository=repository,
+            data_path_in_dvc_repository=data_path_in_dvc_repository,
+            data_path_in_worker=data_path_in_worker,
+            version=version,
+            message=message,
+        )
+        assert dvc_upload.get_define() == expect
+
+
+def test_dvc_download_get_define():
+    """Test task dvc download function get_define."""
+    name = "test_dvc_upload"
+    data_path_in_dvc_repository = "iris"
+    data_path_in_worker = "~/target/iris"
+    version = "v1"
+
+    code = 123
+    version = 1
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": TaskType.DVC,
+        "taskParams": {
+            "resourceList": [],
+            "localParams": [],
+            "dvcTaskType": DvcTaskType.DOWNLOAD,
+            "dvcRepository": repository,
+            "dvcDataLocation": data_path_in_dvc_repository,
+            "dvcLoadSaveDataPath": data_path_in_worker,
+            "dvcVersion": version,
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "environmentCode": None,
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        dvc_download = DVCDownload(
+            name,
+            repository=repository,
+            data_path_in_dvc_repository=data_path_in_dvc_repository,
+            data_path_in_worker=data_path_in_worker,
+            version=version,
+        )
+        assert dvc_download.get_define() == expect
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcConstants.java
index b15d6d450c..6f65bedc74 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcConstants.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcConstants.java
@@ -22,6 +22,12 @@ public class DvcConstants {
         throw new IllegalStateException("Utility class");
     }
 
+    public static final class DVC_TASK_TYPE {
+        public static final String UPLOAD = "Upload";
+        public static final String DOWNLOAD = "Download";
+        public static final String INIT = "Init DVC";
+    };
+
     public static final String CHECK_AND_SET_DVC_REPO = "which dvc || { echo \"dvc does not exist\"; exit 1; }; DVC_REPO=%s";
 
     public static final String SET_DATA_PATH = "DVC_DATA_PATH=%s";
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcParameters.java
index 97c404cb29..ec93e48284 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcParameters.java
@@ -17,15 +17,19 @@
 
 package org.apache.dolphinscheduler.plugin.task.dvc;
 
+import lombok.Data;
+
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
+@Data
 public class DvcParameters extends AbstractParameters {
 
     /**
      * common parameters
      */
 
-    private TaskTypeEnum dvcTaskType;
+    private String dvcTaskType;
 
     private String dvcRepository;
 
@@ -39,67 +43,34 @@ public class DvcParameters extends AbstractParameters {
 
     private String dvcStoreUrl;
 
-    public void setDvcTaskType(TaskTypeEnum dvcTaskType) {
-        this.dvcTaskType = dvcTaskType;
-    }
-
-    public TaskTypeEnum getDvcTaskType() {
-        return dvcTaskType;
-    }
-
-    public void setDvcRepository(String dvcRepository) {
-        this.dvcRepository = dvcRepository;
-    }
-
-    public String getDvcRepository() {
-        return dvcRepository;
-    }
-
-    public void setDvcVersion(String dvcVersion) {
-        this.dvcVersion = dvcVersion;
-    }
-
-    public String getDvcVersion() {
-        return dvcVersion;
-    }
-
-    public void setDvcDataLocation(String dvcDataLocation) {
-        this.dvcDataLocation = dvcDataLocation;
-    }
-
-    public String getDvcDataLocation() {
-        return dvcDataLocation;
-    }
-
-    public void setDvcMessage(String dvcMessage) {
-        this.dvcMessage = dvcMessage;
-    }
-
-    public String getDvcMessage() {
-        return dvcMessage;
-    }
-
-    public void setDvcLoadSaveDataPath(String dvcLoadSaveDataPath) {
-        this.dvcLoadSaveDataPath = dvcLoadSaveDataPath;
-    }
-
-    public String getDvcLoadSaveDataPath() {
-        return dvcLoadSaveDataPath;
-    }
-
-    public void setDvcStoreUrl(String dvcStoreUrl) {
-        this.dvcStoreUrl = dvcStoreUrl;
-    }
-
-    public String getDvcStoreUrl() {
-        return dvcStoreUrl;
-    }
-
     @Override
     public boolean checkParameters() {
-        Boolean checkResult = true;
-        return checkResult;
-    }
 
+        if (StringUtils.isEmpty(dvcTaskType)) {
+            return false;
+        }
+
+        switch (dvcTaskType) {
+            case DvcConstants.DVC_TASK_TYPE.UPLOAD:
+                return StringUtils.isNotEmpty(dvcRepository) &&
+                    StringUtils.isNotEmpty(dvcDataLocation) &&
+                    StringUtils.isNotEmpty(dvcLoadSaveDataPath) &&
+                    StringUtils.isNotEmpty(dvcVersion) &&
+                    StringUtils.isNotEmpty(dvcMessage);
+
+            case DvcConstants.DVC_TASK_TYPE.DOWNLOAD:
+                return StringUtils.isNotEmpty(dvcRepository) &&
+                    StringUtils.isNotEmpty(dvcDataLocation) &&
+                    StringUtils.isNotEmpty(dvcLoadSaveDataPath) &&
+                    StringUtils.isNotEmpty(dvcVersion);
+
+            case DvcConstants.DVC_TASK_TYPE.INIT:
+                return StringUtils.isNotEmpty(dvcRepository) &&
+                    StringUtils.isNotEmpty(dvcStoreUrl);
+
+            default:
+                return false;
+        }
+    }
 }
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
index eb08e55f1a..d049f8360c 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
@@ -107,12 +107,12 @@ public class DvcTask extends AbstractTask {
 
     public String buildCommand() {
         String command = "";
-        TaskTypeEnum taskType = parameters.getDvcTaskType();
-        if (taskType == TaskTypeEnum.UPLOAD) {
+        String taskType = parameters.getDvcTaskType();
+        if (taskType.equals(DvcConstants.DVC_TASK_TYPE.UPLOAD)) {
             command = buildUploadCommond();
-        } else if (taskType == TaskTypeEnum.DOWNLOAD) {
+        } else if (taskType.equals(DvcConstants.DVC_TASK_TYPE.DOWNLOAD)) {
             command = buildDownCommond();
-        } else if (taskType == TaskTypeEnum.INIT) {
+        } else if (taskType.equals(DvcConstants.DVC_TASK_TYPE.INIT)) {
             command = buildInitDvcCommond();
         }
         logger.info("Run DVC task with command: \n{}", command);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/TaskTypeEnum.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/TaskTypeEnum.java
deleted file mode 100644
index 4cdce3eb23..0000000000
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/TaskTypeEnum.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.task.dvc;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-public enum TaskTypeEnum {
-
-    @JsonProperty("Upload")
-    UPLOAD,
-    @JsonProperty("Download")
-    DOWNLOAD,
-    @JsonProperty("Init DVC")
-    INIT
-}
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java
index 2b42c7f77d..ec483d86fb 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java
@@ -124,7 +124,7 @@ public class DvcTaskTest {
 
     private DvcParameters createUploadParameters() {
         DvcParameters parameters = new DvcParameters();
-        parameters.setDvcTaskType(TaskTypeEnum.UPLOAD);
+        parameters.setDvcTaskType(DvcConstants.DVC_TASK_TYPE.UPLOAD);
         parameters.setDvcRepository("git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example");
         parameters.setDvcLoadSaveDataPath("/home/<YOUR-NAME-OR-ORG>/test");
         parameters.setDvcDataLocation("test");
@@ -135,7 +135,7 @@ public class DvcTaskTest {
 
     private DvcParameters createDownloadParameters() {
         DvcParameters parameters = new DvcParameters();
-        parameters.setDvcTaskType(TaskTypeEnum.DOWNLOAD);
+        parameters.setDvcTaskType(DvcConstants.DVC_TASK_TYPE.DOWNLOAD);
         parameters.setDvcRepository("git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example");
         parameters.setDvcLoadSaveDataPath("data");
         parameters.setDvcDataLocation("iris");
@@ -145,7 +145,7 @@ public class DvcTaskTest {
 
     private DvcParameters createInitDvcParameters() {
         DvcParameters parameters = new DvcParameters();
-        parameters.setDvcTaskType(TaskTypeEnum.INIT);
+        parameters.setDvcTaskType(DvcConstants.DVC_TASK_TYPE.INIT);
         parameters.setDvcRepository("git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example");
         parameters.setDvcStoreUrl("~/.dvc_test");
         return parameters;