You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/09/06 01:26:38 UTC
[dolphinscheduler] branch dev updated: [Feat][python] Add parameter environment to task (#11763)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 8988492c43 [Feat][python] Add parameter environment to task (#11763)
8988492c43 is described below
commit 8988492c4329c93b4872ba3a3059a92582e99bda
Author: JieguangZhou <ji...@163.com>
AuthorDate: Tue Sep 6 09:26:30 2022 +0800
[Feat][python] Add parameter environment to task (#11763)
---
.../dolphinscheduler/api/python/PythonGateway.java | 24 ++++++++++++++++++++++
.../src/pydolphinscheduler/core/task.py | 11 ++++++++++
.../src/pydolphinscheduler/java_gateway.py | 4 ++++
.../pydolphinscheduler/tests/core/test_engine.py | 1 +
.../pydolphinscheduler/tests/core/test_task.py | 1 +
.../tests/tasks/test_condition.py | 1 +
.../pydolphinscheduler/tests/tasks/test_datax.py | 2 ++
.../tests/tasks/test_dependent.py | 1 +
.../pydolphinscheduler/tests/tasks/test_flink.py | 1 +
.../pydolphinscheduler/tests/tasks/test_http.py | 1 +
.../tests/tasks/test_map_reduce.py | 1 +
.../tests/tasks/test_procedure.py | 1 +
.../pydolphinscheduler/tests/tasks/test_python.py | 1 +
.../tests/tasks/test_sagemaker.py | 1 +
.../pydolphinscheduler/tests/tasks/test_shell.py | 3 +++
.../pydolphinscheduler/tests/tasks/test_spark.py | 1 +
.../pydolphinscheduler/tests/tasks/test_sql.py | 1 +
.../tests/tasks/test_sub_process.py | 1 +
.../pydolphinscheduler/tests/tasks/test_switch.py | 1 +
19 files changed, 58 insertions(+)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index bcb62f7333..5d0873e6c9 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -30,8 +30,10 @@ import javax.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.api.configuration.PythonGatewayConfiguration;
+import org.apache.dolphinscheduler.api.dto.EnvironmentDto;
import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.EnvironmentService;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProjectService;
@@ -98,6 +100,9 @@ public class PythonGateway {
@Autowired
private TenantService tenantService;
+ @Autowired
+ private EnvironmentService environmentService;
+
@Autowired
private ExecutorService executorService;
@@ -562,6 +567,25 @@ public class PythonGateway {
return result;
}
+ /**
+ * Get environment info by given environment name. It return environment code.
+ * Useful in Python API create task which need environment information.
+ *
+ * @param environmentName name of the environment
+ */
+ public Long getEnvironmentInfo(String environmentName) {
+ Map<String, Object> result = environmentService.queryEnvironmentByName(environmentName);
+
+ if (result.get("data") == null) {
+ String msg = String.format("Can not find valid environment by name %s", environmentName);
+ logger.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ EnvironmentDto environmentDto = EnvironmentDto.class.cast(result.get("data"));
+ return environmentDto.getCode();
+ }
+
+
/**
* Get resource by given resource type and full name. It return map contain resource id, name.
* Useful in Python API create task which need processDefinition information.
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index 93c5f28342..d1a2eae9dd 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -90,6 +90,7 @@ class Task(Base):
"flag",
"task_priority",
"worker_group",
+ "environment_code",
"delay_time",
"fail_retry_times",
"fail_retry_interval",
@@ -110,6 +111,7 @@ class Task(Base):
flag: Optional[str] = TaskFlag.YES,
task_priority: Optional[str] = TaskPriority.MEDIUM,
worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
+ environment_name: Optional[str] = None,
delay_time: Optional[int] = 0,
fail_retry_times: Optional[int] = 0,
fail_retry_interval: Optional[int] = 1,
@@ -129,6 +131,7 @@ class Task(Base):
self.flag = flag
self.task_priority = task_priority
self.worker_group = worker_group
+ self._environment_name = environment_name
self.fail_retry_times = fail_retry_times
self.fail_retry_interval = fail_retry_interval
self.delay_time = delay_time
@@ -145,6 +148,7 @@ class Task(Base):
# move attribute code and version after _process_definition and process_definition declare
self.code, self.version = self.gen_code_and_version()
# Add task to process definition, maybe we could put into property process_definition latter
+
if (
self.process_definition is not None
and self.code not in self.process_definition.tasks
@@ -306,3 +310,10 @@ class Task(Base):
# result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
# gateway_result_checker(result)
return result.get("code"), result.get("version")
+
+ @property
+ def environment_code(self) -> str:
+ """Convert environment name to code."""
+ if self._environment_name is None:
+ return None
+ return JavaGate().query_environment_info(self._environment_name)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
index 59d411b62c..0ff74ba655 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
@@ -98,6 +98,10 @@ class JavaGate:
"""Get resources file info through java gateway."""
return self.java_gateway.entry_point.queryResourcesFileInfo(user_name, name)
+ def query_environment_info(self, name: str):
+ """Get environment info through java gateway."""
+ return self.java_gateway.entry_point.getEnvironmentInfo(name)
+
def get_code_and_version(
self, project_name: str, process_definition_name: str, task_name: str
):
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py
index e36c47ba1b..ba44fad669 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py
@@ -124,6 +124,7 @@ def test_property_task_params(mock_resource, mock_code_version, attr, expect):
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
+ "environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
index 201bdb30cd..3909077b98 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
@@ -161,6 +161,7 @@ def test_task_get_define():
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
+ "environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
index 523264034a..72eec28ed7 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
@@ -381,6 +381,7 @@ def test_condition_get_define(mock_condition_code_version, mock_task_code_versio
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
+ "environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
index 9473f57321..5d1890e83d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
@@ -67,6 +67,7 @@ def test_datax_get_define(mock_datasource):
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
+ "environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
@@ -108,6 +109,7 @@ def test_custom_datax_get_define(json_template):
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
+ "environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py
index f16e291c82..f55700e04b 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py
@@ -782,6 +782,7 @@ def test_dependent_get_define(mock_code_version, mock_dep_code):
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
+ "environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
index 92ae3ba91f..2f30a494b9 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
@@ -68,6 +68,7 @@ def test_flink_get_define(mock_resource):
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
+ "environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py
index 060cdec0b0..399829b68c 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py
@@ -130,6 +130,7 @@ def test_http_get_define():
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
+ "environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py
index dbe9e513f5..5d38e93aa4 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py
@@ -61,6 +61,7 @@ def test_mr_get_define(mock_resource):
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
+ "environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py
index 1782593955..80afe7b879 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py
@@ -96,6 +96,7 @@ def test_sql_get_define(mock_datasource, mock_code_version):
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
+ "environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
index 1cdd85d2cb..e8f7f10d77 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
@@ -132,6 +132,7 @@ def test_python_get_define(name, script_code, raw):
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
+ "environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py
index 8838eaf497..20edc22805 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py
@@ -87,6 +87,7 @@ def test_sagemaker_get_define():
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
+ "environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
index e42f6dc0fb..e2c87d8e7d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
@@ -62,6 +62,7 @@ def test_shell_get_define():
"name": name,
"version": 1,
"description": None,
+ "environmentCode": None,
"delayTime": 0,
"taskType": "SHELL",
"taskParams": {
@@ -75,6 +76,7 @@ def test_shell_get_define():
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
+ "environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
@@ -86,4 +88,5 @@ def test_shell_get_define():
return_value=(code, version),
):
shell = Shell(name, command)
+ print(shell.get_define())
assert shell.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py
index 3b0672f963..ed83f9f953 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py
@@ -68,6 +68,7 @@ def test_spark_get_define(mock_resource):
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
+ "environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
index 50ccd946a9..ba9daa9b2d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
@@ -152,6 +152,7 @@ def test_sql_get_define(mock_datasource):
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
+ "environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py
index 7f471a1b8b..126ab1015e 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py
@@ -99,6 +99,7 @@ def test_sub_process_get_define(mock_process_definition):
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
+ "environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py
index 1f6ff5bfa2..3206b12f7e 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py
@@ -250,6 +250,7 @@ def test_switch_get_define(mock_task_code_version):
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
+ "environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",