You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/12/16 08:41:34 UTC
[dolphinscheduler-sdk-python] branch main updated: [Test] Optimize the test cases in the task plugin (#46)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git
The following commit(s) were added to refs/heads/main by this push:
new f82e07d [Test] Optimize the test cases in the task plugin (#46)
f82e07d is described below
commit f82e07d232380fe9f5582db4dcca920ae00befd4
Author: JieguangZhou <ji...@163.com>
AuthorDate: Fri Dec 16 16:41:30 2022 +0800
[Test] Optimize the test cases in the task plugin (#46)
---
tests/tasks/test_condition.py | 84 +++++++++++----------------
tests/tasks/test_datax.py | 94 ++++++++++--------------------
tests/tasks/test_dependent.py | 85 +++++++++++----------------
tests/tasks/test_dvc.py | 122 +++++++++++----------------------------
tests/tasks/test_flink.py | 59 +++++++------------
tests/tasks/test_http.py | 45 +++++----------
tests/tasks/test_kubernetes.py | 39 ++++---------
tests/tasks/test_map_reduce.py | 45 +++++----------
tests/tasks/test_mlflow.py | 54 +++++------------
tests/tasks/test_openmldb.py | 40 ++++---------
tests/tasks/test_procedure.py | 37 ++++--------
tests/tasks/test_python.py | 35 +++--------
tests/tasks/test_pytorch.py | 2 +-
tests/tasks/test_sagemaker.py | 33 +++--------
tests/tasks/test_shell.py | 35 +++--------
tests/tasks/test_spark.py | 57 +++++++-----------
tests/tasks/test_sql.py | 45 +++++----------
tests/tasks/test_sub_workflow.py | 66 +++++----------------
tests/tasks/test_switch.py | 39 ++++---------
19 files changed, 313 insertions(+), 703 deletions(-)
diff --git a/tests/tasks/test_condition.py b/tests/tasks/test_condition.py
index 3381783..217c727 100644
--- a/tests/tasks/test_condition.py
+++ b/tests/tasks/test_condition.py
@@ -339,60 +339,42 @@ def test_condition_get_define(mock_condition_code_version, mock_task_code_versio
)
name = "test_condition_get_define"
- expect = {
- "code": 123,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": "CONDITIONS",
- "taskParams": {
- "resourceList": [],
- "localParams": [],
- "dependence": {
- "relation": "AND",
- "dependTaskList": [
- {
- "relation": "AND",
- "dependItemList": [
- {"depTaskCode": common_task.code, "status": "SUCCESS"},
- {"depTaskCode": common_task.code, "status": "SUCCESS"},
- {"depTaskCode": common_task.code, "status": "FAILURE"},
- {"depTaskCode": common_task.code, "status": "FAILURE"},
- ],
- },
- {
- "relation": "OR",
- "dependItemList": [
- {"depTaskCode": common_task.code, "status": "SUCCESS"},
- {"depTaskCode": common_task.code, "status": "SUCCESS"},
- {"depTaskCode": common_task.code, "status": "FAILURE"},
- {"depTaskCode": common_task.code, "status": "FAILURE"},
- ],
- },
- ],
- },
- "conditionResult": {
- "successNode": [common_task.code],
- "failedNode": [common_task.code],
- },
- "waitStartTimeout": {},
- },
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
- }
-
task = Condition(
name, condition=cond_operator, success_task=common_task, failed_task=common_task
)
- assert task.get_define() == expect
+ expect_task_parmas = {
+ "resourceList": [],
+ "localParams": [],
+ "dependence": {
+ "relation": "AND",
+ "dependTaskList": [
+ {
+ "relation": "AND",
+ "dependItemList": [
+ {"depTaskCode": common_task.code, "status": "SUCCESS"},
+ {"depTaskCode": common_task.code, "status": "SUCCESS"},
+ {"depTaskCode": common_task.code, "status": "FAILURE"},
+ {"depTaskCode": common_task.code, "status": "FAILURE"},
+ ],
+ },
+ {
+ "relation": "OR",
+ "dependItemList": [
+ {"depTaskCode": common_task.code, "status": "SUCCESS"},
+ {"depTaskCode": common_task.code, "status": "SUCCESS"},
+ {"depTaskCode": common_task.code, "status": "FAILURE"},
+ {"depTaskCode": common_task.code, "status": "FAILURE"},
+ ],
+ },
+ ],
+ },
+ "conditionResult": {
+ "successNode": [common_task.code],
+ "failedNode": [common_task.code],
+ },
+ "waitStartTimeout": {},
+ }
+ assert task.task_params == expect_task_parmas
@patch(
diff --git a/tests/tasks/test_datax.py b/tests/tasks/test_datax.py
index 95f65b3..d1143bb 100644
--- a/tests/tasks/test_datax.py
+++ b/tests/tasks/test_datax.py
@@ -53,65 +53,43 @@ def test_datax_get_define(mock_datasource):
datasource_name = "test_datasource"
datatarget_name = "test_datatarget"
target_table = "test_target_table_name"
- expect = {
- "code": code,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": "DATAX",
- "taskParams": {
- "customConfig": 0,
- "dsType": "MYSQL",
- "dataSource": 1,
- "dtType": "MYSQL",
- "dataTarget": 1,
- "sql": command,
- "targetTable": target_table,
- "jobSpeedByte": 0,
- "jobSpeedRecord": 1000,
- "xms": 1,
- "xmx": 1,
- "preStatements": [],
- "postStatements": [],
- "localParams": [],
- "resourceList": [],
- "dependence": {},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
- },
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
+ expect_task_params = {
+ "customConfig": 0,
+ "dsType": "MYSQL",
+ "dataSource": 1,
+ "dtType": "MYSQL",
+ "dataTarget": 1,
+ "sql": command,
+ "targetTable": target_table,
+ "jobSpeedByte": 0,
+ "jobSpeedRecord": 1000,
+ "xms": 1,
+ "xmx": 1,
+ "preStatements": [],
+ "postStatements": [],
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version),
):
task = DataX(name, datasource_name, datatarget_name, command, target_table)
- assert task.get_define() == expect
+ assert task.task_params == expect_task_params
@pytest.mark.parametrize("json_template", ["json_template"])
def test_custom_datax_get_define(json_template):
"""Test task custom datax function get_define."""
- code = 123
- version = 1
- name = "test_custom_datax_get_define"
- expect = {
- "code": code,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": "DATAX",
- "taskParams": {
+ with patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+ ):
+ task = CustomDataX("test_custom_datax_get_define", json_template)
+ expect_task_params = {
"customConfig": 1,
"json": json_template,
"xms": 1,
@@ -121,23 +99,9 @@ def test_custom_datax_get_define(json_template):
"dependence": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
"waitStartTimeout": {},
- },
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
- }
- with patch(
- "pydolphinscheduler.core.task.Task.gen_code_and_version",
- return_value=(code, version),
- ):
- task = CustomDataX(name, json_template)
- assert task.get_define() == expect
+ }
+
+ assert task.task_params == expect_task_params
@pytest.mark.parametrize(
diff --git a/tests/tasks/test_dependent.py b/tests/tasks/test_dependent.py
index 63dfd54..e35ca87 100644
--- a/tests/tasks/test_dependent.py
+++ b/tests/tasks/test_dependent.py
@@ -769,58 +769,41 @@ def test_dependent_get_define(mock_code_version, mock_dep_code):
)
name = "test_dependent_get_define"
- expect = {
- "code": 123,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": "DEPENDENT",
- "taskParams": {
- "resourceList": [],
- "localParams": [],
- "dependence": {
- "relation": "AND",
- "dependTaskList": [
- {
- "relation": "OR",
- "dependItemList": [
- {
- "projectCode": TEST_PROJECT_CODE,
- "definitionCode": TEST_DEFINITION_CODE,
- "depTaskCode": "0",
- "cycle": "day",
- "dateValue": "today",
- }
- ],
- },
- {
- "relation": "AND",
- "dependItemList": [
- {
- "projectCode": TEST_PROJECT_CODE,
- "definitionCode": TEST_DEFINITION_CODE,
- "depTaskCode": TEST_TASK_CODE,
- "cycle": "day",
- "dateValue": "today",
- }
- ],
- },
- ],
- },
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
+ expect_task_params = {
+ "resourceList": [],
+ "localParams": [],
+ "dependence": {
+ "relation": "AND",
+ "dependTaskList": [
+ {
+ "relation": "OR",
+ "dependItemList": [
+ {
+ "projectCode": TEST_PROJECT_CODE,
+ "definitionCode": TEST_DEFINITION_CODE,
+ "depTaskCode": "0",
+ "cycle": "day",
+ "dateValue": "today",
+ }
+ ],
+ },
+ {
+ "relation": "AND",
+ "dependItemList": [
+ {
+ "projectCode": TEST_PROJECT_CODE,
+ "definitionCode": TEST_DEFINITION_CODE,
+ "depTaskCode": TEST_TASK_CODE,
+ "cycle": "day",
+ "dateValue": "today",
+ }
+ ],
+ },
+ ],
},
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
}
task = Dependent(name, dependence=dep_operator)
- assert task.get_define() == expect
+ assert task.task_params == expect_task_params
diff --git a/tests/tasks/test_dvc.py b/tests/tasks/test_dvc.py
index 815d896..a297228 100644
--- a/tests/tasks/test_dvc.py
+++ b/tests/tasks/test_dvc.py
@@ -18,7 +18,6 @@
"""Test Task Dvc."""
from unittest.mock import patch
-from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.tasks.dvc import DVCDownload, DVCInit, DvcTaskType, DVCUpload
repository = "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git"
@@ -31,39 +30,22 @@ def test_dvc_init_get_define():
code = 123
version = 1
- expect = {
- "code": code,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": TaskType.DVC,
- "taskParams": {
- "resourceList": [],
- "localParams": [],
- "dvcTaskType": DvcTaskType.INIT,
- "dvcRepository": repository,
- "dvcStoreUrl": dvc_store_url,
- "dependence": {},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
- },
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
+ expect_task_params = {
+ "resourceList": [],
+ "localParams": [],
+ "dvcTaskType": DvcTaskType.INIT,
+ "dvcRepository": repository,
+ "dvcStoreUrl": dvc_store_url,
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version),
):
dvc_init = DVCInit(name, repository, dvc_store_url)
- assert dvc_init.get_define() == expect
+ assert dvc_init.task_params == expect_task_params
def test_dvc_upload_get_define():
@@ -76,35 +58,18 @@ def test_dvc_upload_get_define():
code = 123
version = 1
- expect = {
- "code": code,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": TaskType.DVC,
- "taskParams": {
- "resourceList": [],
- "localParams": [],
- "dvcTaskType": DvcTaskType.UPLOAD,
- "dvcRepository": repository,
- "dvcDataLocation": data_path_in_dvc_repository,
- "dvcLoadSaveDataPath": data_path_in_worker,
- "dvcVersion": version,
- "dvcMessage": message,
- "dependence": {},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
- },
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
+ expect_task_params = {
+ "resourceList": [],
+ "localParams": [],
+ "dvcTaskType": DvcTaskType.UPLOAD,
+ "dvcRepository": repository,
+ "dvcDataLocation": data_path_in_dvc_repository,
+ "dvcLoadSaveDataPath": data_path_in_worker,
+ "dvcVersion": version,
+ "dvcMessage": message,
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
@@ -118,7 +83,7 @@ def test_dvc_upload_get_define():
version=version,
message=message,
)
- assert dvc_upload.get_define() == expect
+ assert dvc_upload.task_params == expect_task_params
def test_dvc_download_get_define():
@@ -130,34 +95,17 @@ def test_dvc_download_get_define():
code = 123
version = 1
- expect = {
- "code": code,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": TaskType.DVC,
- "taskParams": {
- "resourceList": [],
- "localParams": [],
- "dvcTaskType": DvcTaskType.DOWNLOAD,
- "dvcRepository": repository,
- "dvcDataLocation": data_path_in_dvc_repository,
- "dvcLoadSaveDataPath": data_path_in_worker,
- "dvcVersion": version,
- "dependence": {},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
- },
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
+ expect_task_params = {
+ "resourceList": [],
+ "localParams": [],
+ "dvcTaskType": DvcTaskType.DOWNLOAD,
+ "dvcRepository": repository,
+ "dvcDataLocation": data_path_in_dvc_repository,
+ "dvcLoadSaveDataPath": data_path_in_worker,
+ "dvcVersion": version,
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
@@ -170,4 +118,4 @@ def test_dvc_download_get_define():
data_path_in_worker=data_path_in_worker,
version=version,
)
- assert dvc_download.get_define() == expect
+ assert dvc_download.task_params == expect_task_params
diff --git a/tests/tasks/test_flink.py b/tests/tasks/test_flink.py
index 2f30a49..ad7bf41 100644
--- a/tests/tasks/test_flink.py
+++ b/tests/tasks/test_flink.py
@@ -36,48 +36,31 @@ def test_flink_get_define(mock_resource):
program_type = ProgramType.JAVA
deploy_mode = DeployMode.LOCAL
- expect = {
- "code": code,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": "FLINK",
- "taskParams": {
- "mainClass": main_class,
- "mainJar": {
- "id": 1,
- },
- "programType": program_type,
- "deployMode": deploy_mode,
- "flinkVersion": FlinkVersion.LOW_VERSION,
- "slot": 1,
- "parallelism": 1,
- "taskManager": 2,
- "jobManagerMemory": "1G",
- "taskManagerMemory": "2G",
- "appName": None,
- "mainArgs": None,
- "others": None,
- "localParams": [],
- "resourceList": [],
- "dependence": {},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
+ expect_task_params = {
+ "mainClass": main_class,
+ "mainJar": {
+ "id": 1,
},
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
+ "programType": program_type,
+ "deployMode": deploy_mode,
+ "flinkVersion": FlinkVersion.LOW_VERSION,
+ "slot": 1,
+ "parallelism": 1,
+ "taskManager": 2,
+ "jobManagerMemory": "1G",
+ "taskManagerMemory": "2G",
+ "appName": None,
+ "mainArgs": None,
+ "others": None,
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version),
):
task = Flink(name, main_class, main_package, program_type, deploy_mode)
- assert task.get_define() == expect
+ assert task.task_params == expect_task_params
diff --git a/tests/tasks/test_http.py b/tests/tasks/test_http.py
index 399829b..1d13fa6 100644
--- a/tests/tasks/test_http.py
+++ b/tests/tasks/test_http.py
@@ -106,40 +106,23 @@ def test_http_get_define():
version = 1
name = "test_http_get_define"
url = "https://www.apache.org"
- expect = {
- "code": code,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": "HTTP",
- "taskParams": {
- "localParams": [],
- "httpParams": [],
- "url": url,
- "httpMethod": "GET",
- "httpCheckCondition": "STATUS_CODE_DEFAULT",
- "condition": None,
- "connectTimeout": 60000,
- "socketTimeout": 60000,
- "dependence": {},
- "resourceList": [],
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
- },
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
+ expect_task_params = {
+ "localParams": [],
+ "httpParams": [],
+ "url": url,
+ "httpMethod": "GET",
+ "httpCheckCondition": "STATUS_CODE_DEFAULT",
+ "condition": None,
+ "connectTimeout": 60000,
+ "socketTimeout": 60000,
+ "dependence": {},
+ "resourceList": [],
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version),
):
http = Http(name, url)
- assert http.get_define() == expect
+ assert http.task_params == expect_task_params
diff --git a/tests/tasks/test_kubernetes.py b/tests/tasks/test_kubernetes.py
index d9b5f75..c5292c8 100644
--- a/tests/tasks/test_kubernetes.py
+++ b/tests/tasks/test_kubernetes.py
@@ -32,37 +32,20 @@ def test_kubernetes_get_define():
minCpuCores = 2.0
minMemorySpace = 10.0
- expect = {
- "code": code,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": "K8S",
- "taskParams": {
- "resourceList": [],
- "localParams": [],
- "image": image,
- "namespace": namespace,
- "minCpuCores": minCpuCores,
- "minMemorySpace": minMemorySpace,
- "dependence": {},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
- },
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
+ expect_task_params = {
+ "resourceList": [],
+ "localParams": [],
+ "image": image,
+ "namespace": namespace,
+ "minCpuCores": minCpuCores,
+ "minMemorySpace": minMemorySpace,
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version),
):
k8s = Kubernetes(name, image, namespace, minCpuCores, minMemorySpace)
- assert k8s.get_define() == expect
+ assert k8s.task_params == expect_task_params
diff --git a/tests/tasks/test_map_reduce.py b/tests/tasks/test_map_reduce.py
index 5d38e93..e08affd 100644
--- a/tests/tasks/test_map_reduce.py
+++ b/tests/tasks/test_map_reduce.py
@@ -36,41 +36,24 @@ def test_mr_get_define(mock_resource):
program_type = ProgramType.JAVA
main_args = "/dolphinscheduler/resources/file.txt /output/ds"
- expect = {
- "code": code,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": "MR",
- "taskParams": {
- "mainClass": main_class,
- "mainJar": {
- "id": 1,
- },
- "programType": program_type,
- "appName": None,
- "mainArgs": main_args,
- "others": None,
- "localParams": [],
- "resourceList": [],
- "dependence": {},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
+ expect_task_params = {
+ "mainClass": main_class,
+ "mainJar": {
+ "id": 1,
},
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
+ "programType": program_type,
+ "appName": None,
+ "mainArgs": main_args,
+ "others": None,
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version),
):
task = MR(name, main_class, main_package, program_type, main_args=main_args)
- assert task.get_define() == expect
+ assert task.task_params == expect_task_params
diff --git a/tests/tasks/test_mlflow.py b/tests/tasks/test_mlflow.py
index af0a324..9e6517e 100644
--- a/tests/tasks/test_mlflow.py
+++ b/tests/tasks/test_mlflow.py
@@ -34,27 +34,11 @@ VERSION = 1
MLFLOW_TRACKING_URI = "http://127.0.0.1:5000"
EXPECT = {
- "code": CODE,
- "version": VERSION,
- "description": None,
- "delayTime": 0,
- "taskType": "MLFLOW",
- "taskParams": {
- "resourceList": [],
- "localParams": [],
- "dependence": {},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
- },
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
+ "resourceList": [],
+ "localParams": [],
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
}
@@ -64,9 +48,7 @@ def test_mlflow_models_get_define():
model_uri = "models:/xgboost_native/Production"
port = 7001
- expect = deepcopy(EXPECT)
- expect["name"] = name
- task_params = expect["taskParams"]
+ task_params = deepcopy(EXPECT)
task_params["mlflowTrackingUri"] = MLFLOW_TRACKING_URI
task_params["mlflowTaskType"] = MLflowTaskType.MLFLOW_MODELS
task_params["deployType"] = MLflowDeployType.DOCKER
@@ -84,7 +66,8 @@ def test_mlflow_models_get_define():
deploy_mode=MLflowDeployType.DOCKER,
port=port,
)
- assert task.get_define() == expect
+ print(task_params)
+ assert task.task_params == task_params
def test_mlflow_project_custom_get_define():
@@ -95,10 +78,7 @@ def test_mlflow_project_custom_get_define():
parameters = "-P learning_rate=0.2 -P colsample_bytree=0.8 -P subsample=0.9"
experiment_name = "xgboost"
- expect = deepcopy(EXPECT)
- expect["name"] = name
- task_params = expect["taskParams"]
-
+ task_params = deepcopy(EXPECT)
task_params["mlflowTrackingUri"] = MLFLOW_TRACKING_URI
task_params["mlflowTaskType"] = MLflowTaskType.MLFLOW_PROJECTS
task_params["mlflowJobType"] = MLflowJobType.CUSTOM_PROJECT
@@ -119,7 +99,7 @@ def test_mlflow_project_custom_get_define():
experiment_name=experiment_name,
version="dev",
)
- assert task.get_define() == expect
+ assert task.task_params == task_params
def test_mlflow_project_automl_get_define():
@@ -132,10 +112,7 @@ def test_mlflow_project_automl_get_define():
automl_tool = "flaml"
data_path = "/data/examples/iris"
- expect = deepcopy(EXPECT)
- expect["name"] = name
- task_params = expect["taskParams"]
-
+ task_params = deepcopy(EXPECT)
task_params["mlflowTrackingUri"] = MLFLOW_TRACKING_URI
task_params["mlflowTaskType"] = MLflowTaskType.MLFLOW_PROJECTS
task_params["mlflowJobType"] = MLflowJobType.AUTOML
@@ -159,7 +136,7 @@ def test_mlflow_project_automl_get_define():
automl_tool=automl_tool,
data_path=data_path,
)
- assert task.get_define() == expect
+ assert task.task_params == task_params
def test_mlflow_project_basic_algorithm_get_define():
@@ -173,10 +150,7 @@ def test_mlflow_project_basic_algorithm_get_define():
data_path = "/data/examples/iris"
search_params = "max_depth=[5, 10];n_estimators=[100, 200]"
- expect = deepcopy(EXPECT)
- expect["name"] = name
- task_params = expect["taskParams"]
-
+ task_params = deepcopy(EXPECT)
task_params["mlflowTrackingUri"] = MLFLOW_TRACKING_URI
task_params["mlflowTaskType"] = MLflowTaskType.MLFLOW_PROJECTS
task_params["mlflowJobType"] = MLflowJobType.BASIC_ALGORITHM
@@ -202,4 +176,4 @@ def test_mlflow_project_basic_algorithm_get_define():
data_path=data_path,
search_params=search_params,
)
- assert task.get_define() == expect
+ assert task.task_params == task_params
diff --git a/tests/tasks/test_openmldb.py b/tests/tasks/test_openmldb.py
index f580ab0..5675bce 100644
--- a/tests/tasks/test_openmldb.py
+++ b/tests/tasks/test_openmldb.py
@@ -18,7 +18,6 @@
"""Test Task OpenMLDB."""
from unittest.mock import patch
-from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.tasks.openmldb import OpenMLDB
@@ -37,37 +36,20 @@ def test_openmldb_get_define():
code = 123
version = 1
name = "test_openmldb_get_define"
- expect = {
- "code": code,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": TaskType.OPENMLDB,
- "taskParams": {
- "resourceList": [],
- "localParams": [],
- "zk": zookeeper,
- "zkPath": zookeeper_path,
- "executeMode": execute_mode,
- "sql": sql,
- "dependence": {},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
- },
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
+ expect_task_params = {
+ "resourceList": [],
+ "localParams": [],
+ "zk": zookeeper,
+ "zkPath": zookeeper_path,
+ "executeMode": execute_mode,
+ "sql": sql,
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version),
):
openmldb = OpenMLDB(name, zookeeper, zookeeper_path, execute_mode, sql)
- assert openmldb.get_define() == expect
+ assert openmldb.task_params == expect_task_params
diff --git a/tests/tasks/test_procedure.py b/tests/tasks/test_procedure.py
index 80afe7b..02bfaf5 100644
--- a/tests/tasks/test_procedure.py
+++ b/tests/tasks/test_procedure.py
@@ -76,32 +76,15 @@ def test_property_task_params(mock_datasource, mock_code_version, attr, expect):
def test_sql_get_define(mock_datasource, mock_code_version):
"""Test task procedure function get_define."""
name = "test_procedure_get_define"
- expect = {
- "code": 123,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": "PROCEDURE",
- "taskParams": {
- "type": "MYSQL",
- "datasource": 1,
- "method": TEST_PROCEDURE_SQL,
- "localParams": [],
- "resourceList": [],
- "dependence": {},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
- },
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
+ expect_task_params = {
+ "type": "MYSQL",
+ "datasource": 1,
+ "method": TEST_PROCEDURE_SQL,
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
}
task = Procedure(name, TEST_PROCEDURE_DATASOURCE_NAME, TEST_PROCEDURE_SQL)
- assert task.get_define() == expect
+ assert task.task_params == expect_task_params
diff --git a/tests/tasks/test_python.py b/tests/tasks/test_python.py
index 77aa106..f237619 100644
--- a/tests/tasks/test_python.py
+++ b/tests/tasks/test_python.py
@@ -132,38 +132,21 @@ def test_python_get_define(name, script_code, raw):
"""Test task python function get_define."""
code = 123
version = 1
- expect = {
- "code": code,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": "PYTHON",
- "taskParams": {
- "definition": script_code,
- "resourceList": [],
- "localParams": [],
- "rawScript": raw,
- "dependence": {},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
- },
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
+ expect_task_params = {
+ "definition": script_code,
+ "resourceList": [],
+ "localParams": [],
+ "rawScript": raw,
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version),
):
shell = Python(name, script_code)
- assert shell.get_define() == expect
+ assert shell.task_params == expect_task_params
@pytest.mark.parametrize(
diff --git a/tests/tasks/test_pytorch.py b/tests/tasks/test_pytorch.py
index eccb51c..c04723e 100644
--- a/tests/tasks/test_pytorch.py
+++ b/tests/tasks/test_pytorch.py
@@ -90,7 +90,7 @@ def test_pytorch_get_define():
python_env_tool=python_env_tool,
requirements=requirements,
)
- assert task.get_define() == expect
+ assert task.task_params == task_params
@pytest.mark.parametrize(
diff --git a/tests/tasks/test_sagemaker.py b/tests/tasks/test_sagemaker.py
index 20edc22..ec0c42e 100644
--- a/tests/tasks/test_sagemaker.py
+++ b/tests/tasks/test_sagemaker.py
@@ -69,34 +69,17 @@ def test_sagemaker_get_define():
code = 123
version = 1
name = "test_sagemaker_get_define"
- expect = {
- "code": code,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": "SAGEMAKER",
- "taskParams": {
- "resourceList": [],
- "localParams": [],
- "sagemakerRequestJson": sagemaker_request_json,
- "dependence": {},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
- },
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
+ expect_task_params = {
+ "resourceList": [],
+ "localParams": [],
+ "sagemakerRequestJson": sagemaker_request_json,
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version),
):
sagemaker = SageMaker(name, sagemaker_request_json)
- assert sagemaker.get_define() == expect
+ assert sagemaker.task_params == expect_task_params
diff --git a/tests/tasks/test_shell.py b/tests/tasks/test_shell.py
index 9344ac2..3932c08 100644
--- a/tests/tasks/test_shell.py
+++ b/tests/tasks/test_shell.py
@@ -73,39 +73,20 @@ def test_shell_get_define():
version = 1
name = "test_shell_get_define"
command = "echo test shell"
- expect = {
- "code": code,
- "name": name,
- "version": 1,
- "description": None,
- "environmentCode": None,
- "delayTime": 0,
- "taskType": "SHELL",
- "taskParams": {
- "resourceList": [],
- "localParams": [],
- "rawScript": command,
- "dependence": {},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
- },
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
+ expect_task_params = {
+ "resourceList": [],
+ "localParams": [],
+ "rawScript": command,
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version),
):
shell = Shell(name, command)
- print(shell.get_define())
- assert shell.get_define() == expect
+ assert shell.task_params == expect_task_params
@pytest.mark.parametrize(
diff --git a/tests/tasks/test_spark.py b/tests/tasks/test_spark.py
index 1fdb1fa..eeebc5c 100644
--- a/tests/tasks/test_spark.py
+++ b/tests/tasks/test_spark.py
@@ -36,47 +36,30 @@ def test_spark_get_define(mock_resource):
program_type = ProgramType.JAVA
deploy_mode = DeployMode.LOCAL
- expect = {
- "code": code,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": "SPARK",
- "taskParams": {
- "mainClass": main_class,
- "mainJar": {
- "id": 1,
- },
- "programType": program_type,
- "deployMode": deploy_mode,
- "driverCores": 1,
- "driverMemory": "512M",
- "numExecutors": 2,
- "executorMemory": "2G",
- "executorCores": 2,
- "appName": None,
- "mainArgs": None,
- "others": None,
- "localParams": [],
- "resourceList": [],
- "dependence": {},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
+ expect_task_params = {
+ "mainClass": main_class,
+ "mainJar": {
+ "id": 1,
},
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
+ "programType": program_type,
+ "deployMode": deploy_mode,
+ "driverCores": 1,
+ "driverMemory": "512M",
+ "numExecutors": 2,
+ "executorMemory": "2G",
+ "executorCores": 2,
+ "appName": None,
+ "mainArgs": None,
+ "others": None,
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version),
):
task = Spark(name, main_class, main_package, program_type, deploy_mode)
- assert task.get_define() == expect
+ assert task.task_params == expect_task_params
diff --git a/tests/tasks/test_sql.py b/tests/tasks/test_sql.py
index a22d920..b9de05f 100644
--- a/tests/tasks/test_sql.py
+++ b/tests/tasks/test_sql.py
@@ -143,43 +143,26 @@ def test_sql_get_define(mock_datasource):
name = "test_sql_get_define"
command = "select 1"
datasource_name = "test_datasource"
- expect = {
- "code": code,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": "SQL",
- "taskParams": {
- "type": "MYSQL",
- "datasource": 1,
- "sql": command,
- "sqlType": "0",
- "displayRows": 10,
- "preStatements": [],
- "postStatements": [],
- "localParams": [],
- "resourceList": [],
- "dependence": {},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
- },
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
+ expect_task_params = {
+ "type": "MYSQL",
+ "datasource": 1,
+ "sql": command,
+ "sqlType": "0",
+ "displayRows": 10,
+ "preStatements": [],
+ "postStatements": [],
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version),
):
task = Sql(name, datasource_name, command)
- assert task.get_define() == expect
+ assert task.task_params == expect_task_params
@pytest.mark.parametrize(
diff --git a/tests/tasks/test_sub_workflow.py b/tests/tasks/test_sub_workflow.py
index fc18e32..5963698 100644
--- a/tests/tasks/test_sub_workflow.py
+++ b/tests/tasks/test_sub_workflow.py
@@ -80,30 +80,13 @@ def test_sub_workflow_get_define(mock_workflow_definition):
code = 123
version = 1
name = "test_sub_workflow_get_define"
- expect = {
- "code": code,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": "SUB_PROCESS",
- "taskParams": {
- "resourceList": [],
- "localParams": [],
- "processDefinitionCode": TEST_SUB_WORKFLOW_CODE,
- "dependence": {},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
- },
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
+ expect_task_params = {
+ "resourceList": [],
+ "localParams": [],
+ "processDefinitionCode": TEST_SUB_WORKFLOW_CODE,
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
@@ -111,7 +94,7 @@ def test_sub_workflow_get_define(mock_workflow_definition):
):
with Workflow(TEST_WORKFLOW_NAME):
sub_workflow = SubWorkflow(name, TEST_SUB_WORKFLOW_NAME)
- assert sub_workflow.get_define() == expect
+ assert sub_workflow.task_params == expect_task_params
@patch(
@@ -129,30 +112,13 @@ def test_deprecated_sub_workflow_get_define(mock_workflow_definition):
code = 123
version = 1
name = "test_sub_workflow_get_define"
- expect = {
- "code": code,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": "SUB_PROCESS",
- "taskParams": {
- "resourceList": [],
- "localParams": [],
- "processDefinitionCode": TEST_SUB_WORKFLOW_CODE,
- "dependence": {},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
- },
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
+ expect_task_params = {
+ "resourceList": [],
+ "localParams": [],
+ "processDefinitionCode": TEST_SUB_WORKFLOW_CODE,
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
@@ -167,4 +133,4 @@ def test_deprecated_sub_workflow_get_define(mock_workflow_definition):
with Workflow(TEST_WORKFLOW_NAME):
sub_workflow = SubProcess(name, TEST_SUB_WORKFLOW_NAME)
- assert sub_workflow.get_define() == expect
+ assert sub_workflow.task_params == expect_task_params
diff --git a/tests/tasks/test_switch.py b/tests/tasks/test_switch.py
index 677ba41..dcd5e40 100644
--- a/tests/tasks/test_switch.py
+++ b/tests/tasks/test_switch.py
@@ -226,38 +226,21 @@ def test_switch_get_define(mock_task_code_version):
)
name = "test_switch_get_define"
- expect = {
- "code": 123,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": "SWITCH",
- "taskParams": {
- "resourceList": [],
- "localParams": [],
- "waitStartTimeout": {},
- "switchResult": {
- "dependTaskList": [
- {"condition": "${var1} > 1", "nextNode": task.code},
- {"condition": "${var1} <= 1", "nextNode": task.code},
- ],
- "nextNode": task.code,
- },
+ expect_task_params = {
+ "resourceList": [],
+ "localParams": [],
+ "waitStartTimeout": {},
+ "switchResult": {
+ "dependTaskList": [
+ {"condition": "${var1} > 1", "nextNode": task.code},
+ {"condition": "${var1} <= 1", "nextNode": task.code},
+ ],
+ "nextNode": task.code,
},
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "environmentCode": None,
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
}
task = Switch(name, condition=switch_condition)
- assert task.get_define() == expect
+ assert task.task_params == expect_task_params
@patch(