You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/17 08:56:01 UTC
[dolphinscheduler] branch dev updated: [Feature][PyDolphinScheduler] Support DVC task in pyds #11922 (#11941)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new bbc228e93f [Feature][PyDolphinScheduler] Support DVC task in pyds #11922 (#11941)
bbc228e93f is described below
commit bbc228e93f1ddd372bad0c06e50658583286ffd6
Author: JieguangZhou <ji...@163.com>
AuthorDate: Sat Sep 17 16:55:51 2022 +0800
[Feature][PyDolphinScheduler] Support DVC task in pyds #11922 (#11941)
* add dvc task in pyds
* add BaseDVC class
---
.../docs/source/tasks/{index.rst => dvc.rst} | 53 +++----
.../pydolphinscheduler/docs/source/tasks/index.rst | 1 +
.../examples/yaml_define/Dvc.yaml | 46 ++++++
.../src/pydolphinscheduler/constants.py | 1 +
.../examples/task_dvc_example.py | 52 +++++++
.../src/pydolphinscheduler/tasks/__init__.py | 4 +
.../src/pydolphinscheduler/tasks/dvc.py | 124 +++++++++++++++
.../pydolphinscheduler/tests/tasks/test_dvc.py | 173 +++++++++++++++++++++
.../plugin/task/dvc/DvcConstants.java | 6 +
.../plugin/task/dvc/DvcParameters.java | 91 ++++-------
.../dolphinscheduler/plugin/task/dvc/DvcTask.java | 8 +-
.../plugin/task/dvc/TaskTypeEnum.java | 30 ----
.../plugin/task/dvc/DvcTaskTest.java | 6 +-
13 files changed, 469 insertions(+), 126 deletions(-)
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dvc.rst
similarity index 62%
copy from dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
copy to dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dvc.rst
index 5b9c165700..0127a982f3 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dvc.rst
@@ -15,32 +15,27 @@
specific language governing permissions and limitations
under the License.
-Tasks
-=====
-
-In this section
-
-.. toctree::
- :maxdepth: 1
-
- func_wrap
- shell
- sql
- python
- http
-
- switch
- condition
- dependent
-
- spark
- flink
- map_reduce
- procedure
-
- datax
- sub_process
-
- sagemaker
- openmldb
- pytorch
+DVC
+===
+
+A DVC task type's example and dive into information of **PyDolphinScheduler**.
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_dvc_example.py
+ :start-after: [start workflow_declare]
+ :end-before: [end workflow_declare]
+
+Dive Into
+---------
+
+.. automodule:: pydolphinscheduler.tasks.dvc
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Dvc.yaml
+ :start-after: # under the License.
+ :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
index 5b9c165700..c0cea593d8 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
@@ -44,3 +44,4 @@ In this section
sagemaker
openmldb
pytorch
+ dvc
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dvc.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dvc.yaml
new file mode 100644
index 0000000000..a6ec18c372
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dvc.yaml
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define variable `repository`
+repository: &repository "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git"
+
+# Define the workflow
+workflow:
+ name: "DVC"
+ release_state: "offline"
+
+# Define the tasks under the process
+tasks:
+ - name: init_dvc
+ task_type: DVCInit
+ repository: *repository
+ store_url: ~/dvc_data
+
+ - name: upload_data
+ task_type: DVCUpload
+ repository: *repository
+ data_path_in_dvc_repository: "iris"
+ data_path_in_worker: ~/source/iris
+ version: v1
+ message: upload iris data v1
+
+ - name: download_data
+ task_type: DVCDownload
+ repository: *repository
+ data_path_in_dvc_repository: "iris"
+ data_path_in_worker: ~/target/iris
+ version: v1
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index d8d2febfeb..b4a89bb585 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -60,6 +60,7 @@ class TaskType(str):
SAGEMAKER = "SAGEMAKER"
OPENMLDB = "OPENMLDB"
PYTORCH = "PYTORCH"
+ DVC = "DVC"
class DefaultTaskCodeNum(str):
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_dvc_example.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_dvc_example.py
new file mode 100644
index 0000000000..2b93cd14b7
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_dvc_example.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [start workflow_declare]
+"""A example workflow for task dvc."""
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks import DVCDownload, DVCInit, DVCUpload
+
+repository = "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git"
+
+with ProcessDefinition(
+ name="task_dvc_example",
+ tenant="tenant_exists",
+) as pd:
+ init_task = DVCInit(name="init_dvc", repository=repository, store_url="~/dvc_data")
+ upload_task = DVCUpload(
+ name="upload_data",
+ repository=repository,
+ data_path_in_dvc_repository="iris",
+ data_path_in_worker="~/source/iris",
+ version="v1",
+ message="upload iris data v1",
+ )
+
+ download_task = DVCDownload(
+ name="download_data",
+ repository=repository,
+ data_path_in_dvc_repository="iris",
+ data_path_in_worker="~/target/iris",
+ version="v1",
+ )
+
+ init_task >> upload_task >> download_task
+
+ pd.run()
+
+# [end workflow_declare]
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
index e5b263c7c2..cefc0024ca 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
@@ -20,6 +20,7 @@
from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Condition, Or
from pydolphinscheduler.tasks.datax import CustomDataX, DataX
from pydolphinscheduler.tasks.dependent import Dependent
+from pydolphinscheduler.tasks.dvc import DVCDownload, DVCInit, DVCUpload
from pydolphinscheduler.tasks.flink import Flink
from pydolphinscheduler.tasks.http import Http
from pydolphinscheduler.tasks.map_reduce import MR
@@ -39,6 +40,9 @@ __all__ = [
"DataX",
"CustomDataX",
"Dependent",
+ "DVCInit",
+ "DVCUpload",
+ "DVCDownload",
"Flink",
"Http",
"MR",
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dvc.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dvc.py
new file mode 100644
index 0000000000..c5b5cd5c91
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dvc.py
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task dvc."""
+from copy import deepcopy
+from typing import Dict
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task
+
+
+class DvcTaskType(str):
+ """Constants for dvc task type."""
+
+ INIT = "Init DVC"
+ DOWNLOAD = "Download"
+ UPLOAD = "Upload"
+
+
+class BaseDVC(Task):
+ """Base class for dvc task."""
+
+ dvc_task_type = None
+
+ _task_custom_attr = {
+ "dvc_task_type",
+ "dvc_repository",
+ }
+
+ _child_task_dvc_attr = set()
+
+ def __init__(self, name: str, repository: str, *args, **kwargs):
+ super().__init__(name, TaskType.DVC, *args, **kwargs)
+ self.dvc_repository = repository
+
+ @property
+ def task_params(self) -> Dict:
+ """Return task params."""
+ self._task_custom_attr = deepcopy(self._task_custom_attr)
+ self._task_custom_attr.update(self._child_task_dvc_attr)
+ return super().task_params
+
+
+class DVCInit(BaseDVC):
+ """Task DVC Init object, declare behavior for DVC Init task to dolphinscheduler."""
+
+ dvc_task_type = DvcTaskType.INIT
+
+ _child_task_dvc_attr = {"dvc_store_url"}
+
+ def __init__(self, name: str, repository: str, store_url: str, *args, **kwargs):
+ super().__init__(name, repository, *args, **kwargs)
+ self.dvc_store_url = store_url
+
+
+class DVCDownload(BaseDVC):
+ """Task DVC Download object, declare behavior for DVC Download task to dolphinscheduler."""
+
+ dvc_task_type = DvcTaskType.DOWNLOAD
+
+ _child_task_dvc_attr = {
+ "dvc_load_save_data_path",
+ "dvc_data_location",
+ "dvc_version",
+ }
+
+ def __init__(
+ self,
+ name: str,
+ repository: str,
+ data_path_in_dvc_repository: str,
+ data_path_in_worker: str,
+ version: str,
+ *args,
+ **kwargs
+ ):
+ super().__init__(name, repository, *args, **kwargs)
+ self.dvc_data_location = data_path_in_dvc_repository
+ self.dvc_load_save_data_path = data_path_in_worker
+ self.dvc_version = version
+
+
+class DVCUpload(BaseDVC):
+ """Task DVC Upload object, declare behavior for DVC Upload task to dolphinscheduler."""
+
+ dvc_task_type = DvcTaskType.UPLOAD
+
+ _child_task_dvc_attr = {
+ "dvc_load_save_data_path",
+ "dvc_data_location",
+ "dvc_version",
+ "dvc_message",
+ }
+
+ def __init__(
+ self,
+ name: str,
+ repository: str,
+ data_path_in_worker: str,
+ data_path_in_dvc_repository: str,
+ version: str,
+ message: str,
+ *args,
+ **kwargs
+ ):
+ super().__init__(name, repository, *args, **kwargs)
+ self.dvc_data_location = data_path_in_dvc_repository
+ self.dvc_load_save_data_path = data_path_in_worker
+ self.dvc_version = version
+ self.dvc_message = message
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dvc.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dvc.py
new file mode 100644
index 0000000000..815d896234
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dvc.py
@@ -0,0 +1,173 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task Dvc."""
+from unittest.mock import patch
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.tasks.dvc import DVCDownload, DVCInit, DvcTaskType, DVCUpload
+
+repository = "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git"
+
+
+def test_dvc_init_get_define():
+ """Test task dvc init function get_define."""
+ name = "test_dvc_init"
+ dvc_store_url = "~/dvc_data"
+
+ code = 123
+ version = 1
+ expect = {
+ "code": code,
+ "name": name,
+ "version": 1,
+ "description": None,
+ "delayTime": 0,
+ "taskType": TaskType.DVC,
+ "taskParams": {
+ "resourceList": [],
+ "localParams": [],
+ "dvcTaskType": DvcTaskType.INIT,
+ "dvcRepository": repository,
+ "dvcStoreUrl": dvc_store_url,
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
+ },
+ "flag": "YES",
+ "taskPriority": "MEDIUM",
+ "workerGroup": "default",
+ "environmentCode": None,
+ "failRetryTimes": 0,
+ "failRetryInterval": 1,
+ "timeoutFlag": "CLOSE",
+ "timeoutNotifyStrategy": None,
+ "timeout": 0,
+ }
+ with patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(code, version),
+ ):
+ dvc_init = DVCInit(name, repository, dvc_store_url)
+ assert dvc_init.get_define() == expect
+
+
+def test_dvc_upload_get_define():
+ """Test task dvc upload function get_define."""
+ name = "test_dvc_upload"
+ data_path_in_dvc_repository = "iris"
+ data_path_in_worker = "~/source/iris"
+ version = "v1"
+ message = "upload iris data v1"
+
+ code = 123
+ version = 1
+ expect = {
+ "code": code,
+ "name": name,
+ "version": 1,
+ "description": None,
+ "delayTime": 0,
+ "taskType": TaskType.DVC,
+ "taskParams": {
+ "resourceList": [],
+ "localParams": [],
+ "dvcTaskType": DvcTaskType.UPLOAD,
+ "dvcRepository": repository,
+ "dvcDataLocation": data_path_in_dvc_repository,
+ "dvcLoadSaveDataPath": data_path_in_worker,
+ "dvcVersion": version,
+ "dvcMessage": message,
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
+ },
+ "flag": "YES",
+ "taskPriority": "MEDIUM",
+ "workerGroup": "default",
+ "environmentCode": None,
+ "failRetryTimes": 0,
+ "failRetryInterval": 1,
+ "timeoutFlag": "CLOSE",
+ "timeoutNotifyStrategy": None,
+ "timeout": 0,
+ }
+ with patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(code, version),
+ ):
+ dvc_upload = DVCUpload(
+ name,
+ repository=repository,
+ data_path_in_dvc_repository=data_path_in_dvc_repository,
+ data_path_in_worker=data_path_in_worker,
+ version=version,
+ message=message,
+ )
+ assert dvc_upload.get_define() == expect
+
+
+def test_dvc_download_get_define():
+ """Test task dvc download function get_define."""
+ name = "test_dvc_upload"
+ data_path_in_dvc_repository = "iris"
+ data_path_in_worker = "~/target/iris"
+ version = "v1"
+
+ code = 123
+ version = 1
+ expect = {
+ "code": code,
+ "name": name,
+ "version": 1,
+ "description": None,
+ "delayTime": 0,
+ "taskType": TaskType.DVC,
+ "taskParams": {
+ "resourceList": [],
+ "localParams": [],
+ "dvcTaskType": DvcTaskType.DOWNLOAD,
+ "dvcRepository": repository,
+ "dvcDataLocation": data_path_in_dvc_repository,
+ "dvcLoadSaveDataPath": data_path_in_worker,
+ "dvcVersion": version,
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
+ },
+ "flag": "YES",
+ "taskPriority": "MEDIUM",
+ "workerGroup": "default",
+ "environmentCode": None,
+ "failRetryTimes": 0,
+ "failRetryInterval": 1,
+ "timeoutFlag": "CLOSE",
+ "timeoutNotifyStrategy": None,
+ "timeout": 0,
+ }
+ with patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(code, version),
+ ):
+ dvc_download = DVCDownload(
+ name,
+ repository=repository,
+ data_path_in_dvc_repository=data_path_in_dvc_repository,
+ data_path_in_worker=data_path_in_worker,
+ version=version,
+ )
+ assert dvc_download.get_define() == expect
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcConstants.java
index b15d6d450c..6f65bedc74 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcConstants.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcConstants.java
@@ -22,6 +22,12 @@ public class DvcConstants {
throw new IllegalStateException("Utility class");
}
+ public static final class DVC_TASK_TYPE {
+ public static final String UPLOAD = "Upload";
+ public static final String DOWNLOAD = "Download";
+ public static final String INIT = "Init DVC";
+ };
+
public static final String CHECK_AND_SET_DVC_REPO = "which dvc || { echo \"dvc does not exist\"; exit 1; }; DVC_REPO=%s";
public static final String SET_DATA_PATH = "DVC_DATA_PATH=%s";
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcParameters.java
index 97c404cb29..ec93e48284 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcParameters.java
@@ -17,15 +17,19 @@
package org.apache.dolphinscheduler.plugin.task.dvc;
+import lombok.Data;
+
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+@Data
public class DvcParameters extends AbstractParameters {
/**
* common parameters
*/
- private TaskTypeEnum dvcTaskType;
+ private String dvcTaskType;
private String dvcRepository;
@@ -39,67 +43,34 @@ public class DvcParameters extends AbstractParameters {
private String dvcStoreUrl;
- public void setDvcTaskType(TaskTypeEnum dvcTaskType) {
- this.dvcTaskType = dvcTaskType;
- }
-
- public TaskTypeEnum getDvcTaskType() {
- return dvcTaskType;
- }
-
- public void setDvcRepository(String dvcRepository) {
- this.dvcRepository = dvcRepository;
- }
-
- public String getDvcRepository() {
- return dvcRepository;
- }
-
- public void setDvcVersion(String dvcVersion) {
- this.dvcVersion = dvcVersion;
- }
-
- public String getDvcVersion() {
- return dvcVersion;
- }
-
- public void setDvcDataLocation(String dvcDataLocation) {
- this.dvcDataLocation = dvcDataLocation;
- }
-
- public String getDvcDataLocation() {
- return dvcDataLocation;
- }
-
- public void setDvcMessage(String dvcMessage) {
- this.dvcMessage = dvcMessage;
- }
-
- public String getDvcMessage() {
- return dvcMessage;
- }
-
- public void setDvcLoadSaveDataPath(String dvcLoadSaveDataPath) {
- this.dvcLoadSaveDataPath = dvcLoadSaveDataPath;
- }
-
- public String getDvcLoadSaveDataPath() {
- return dvcLoadSaveDataPath;
- }
-
- public void setDvcStoreUrl(String dvcStoreUrl) {
- this.dvcStoreUrl = dvcStoreUrl;
- }
-
- public String getDvcStoreUrl() {
- return dvcStoreUrl;
- }
-
@Override
public boolean checkParameters() {
- Boolean checkResult = true;
- return checkResult;
- }
+ if (StringUtils.isEmpty(dvcTaskType)) {
+ return false;
+ }
+
+ switch (dvcTaskType) {
+ case DvcConstants.DVC_TASK_TYPE.UPLOAD:
+ return StringUtils.isNotEmpty(dvcRepository) &&
+ StringUtils.isNotEmpty(dvcDataLocation) &&
+ StringUtils.isNotEmpty(dvcLoadSaveDataPath) &&
+ StringUtils.isNotEmpty(dvcVersion) &&
+ StringUtils.isNotEmpty(dvcMessage);
+
+ case DvcConstants.DVC_TASK_TYPE.DOWNLOAD:
+ return StringUtils.isNotEmpty(dvcRepository) &&
+ StringUtils.isNotEmpty(dvcDataLocation) &&
+ StringUtils.isNotEmpty(dvcLoadSaveDataPath) &&
+ StringUtils.isNotEmpty(dvcVersion);
+
+ case DvcConstants.DVC_TASK_TYPE.INIT:
+ return StringUtils.isNotEmpty(dvcRepository) &&
+ StringUtils.isNotEmpty(dvcStoreUrl);
+
+ default:
+ return false;
+ }
+ }
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
index eb08e55f1a..d049f8360c 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
@@ -107,12 +107,12 @@ public class DvcTask extends AbstractTask {
public String buildCommand() {
String command = "";
- TaskTypeEnum taskType = parameters.getDvcTaskType();
- if (taskType == TaskTypeEnum.UPLOAD) {
+ String taskType = parameters.getDvcTaskType();
+ if (taskType.equals(DvcConstants.DVC_TASK_TYPE.UPLOAD)) {
command = buildUploadCommond();
- } else if (taskType == TaskTypeEnum.DOWNLOAD) {
+ } else if (taskType.equals(DvcConstants.DVC_TASK_TYPE.DOWNLOAD)) {
command = buildDownCommond();
- } else if (taskType == TaskTypeEnum.INIT) {
+ } else if (taskType.equals(DvcConstants.DVC_TASK_TYPE.INIT)) {
command = buildInitDvcCommond();
}
logger.info("Run DVC task with command: \n{}", command);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/TaskTypeEnum.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/TaskTypeEnum.java
deleted file mode 100644
index 4cdce3eb23..0000000000
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/TaskTypeEnum.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.task.dvc;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-public enum TaskTypeEnum {
-
- @JsonProperty("Upload")
- UPLOAD,
- @JsonProperty("Download")
- DOWNLOAD,
- @JsonProperty("Init DVC")
- INIT
-}
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java
index 2b42c7f77d..ec483d86fb 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java
@@ -124,7 +124,7 @@ public class DvcTaskTest {
private DvcParameters createUploadParameters() {
DvcParameters parameters = new DvcParameters();
- parameters.setDvcTaskType(TaskTypeEnum.UPLOAD);
+ parameters.setDvcTaskType(DvcConstants.DVC_TASK_TYPE.UPLOAD);
parameters.setDvcRepository("git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example");
parameters.setDvcLoadSaveDataPath("/home/<YOUR-NAME-OR-ORG>/test");
parameters.setDvcDataLocation("test");
@@ -135,7 +135,7 @@ public class DvcTaskTest {
private DvcParameters createDownloadParameters() {
DvcParameters parameters = new DvcParameters();
- parameters.setDvcTaskType(TaskTypeEnum.DOWNLOAD);
+ parameters.setDvcTaskType(DvcConstants.DVC_TASK_TYPE.DOWNLOAD);
parameters.setDvcRepository("git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example");
parameters.setDvcLoadSaveDataPath("data");
parameters.setDvcDataLocation("iris");
@@ -145,7 +145,7 @@ public class DvcTaskTest {
private DvcParameters createInitDvcParameters() {
DvcParameters parameters = new DvcParameters();
- parameters.setDvcTaskType(TaskTypeEnum.INIT);
+ parameters.setDvcTaskType(DvcConstants.DVC_TASK_TYPE.INIT);
parameters.setDvcRepository("git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example");
parameters.setDvcStoreUrl("~/.dvc_test");
return parameters;