You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/09/06 01:26:38 UTC

[dolphinscheduler] branch dev updated: [Feat][python] Add parameter environment to task (#11763)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8988492c43 [Feat][python] Add parameter environment to task (#11763)
8988492c43 is described below

commit 8988492c4329c93b4872ba3a3059a92582e99bda
Author: JieguangZhou <ji...@163.com>
AuthorDate: Tue Sep 6 09:26:30 2022 +0800

    [Feat][python] Add parameter environment to task (#11763)
---
 .../dolphinscheduler/api/python/PythonGateway.java | 24 ++++++++++++++++++++++
 .../src/pydolphinscheduler/core/task.py            | 11 ++++++++++
 .../src/pydolphinscheduler/java_gateway.py         |  4 ++++
 .../pydolphinscheduler/tests/core/test_engine.py   |  1 +
 .../pydolphinscheduler/tests/core/test_task.py     |  1 +
 .../tests/tasks/test_condition.py                  |  1 +
 .../pydolphinscheduler/tests/tasks/test_datax.py   |  2 ++
 .../tests/tasks/test_dependent.py                  |  1 +
 .../pydolphinscheduler/tests/tasks/test_flink.py   |  1 +
 .../pydolphinscheduler/tests/tasks/test_http.py    |  1 +
 .../tests/tasks/test_map_reduce.py                 |  1 +
 .../tests/tasks/test_procedure.py                  |  1 +
 .../pydolphinscheduler/tests/tasks/test_python.py  |  1 +
 .../tests/tasks/test_sagemaker.py                  |  1 +
 .../pydolphinscheduler/tests/tasks/test_shell.py   |  3 +++
 .../pydolphinscheduler/tests/tasks/test_spark.py   |  1 +
 .../pydolphinscheduler/tests/tasks/test_sql.py     |  1 +
 .../tests/tasks/test_sub_process.py                |  1 +
 .../pydolphinscheduler/tests/tasks/test_switch.py  |  1 +
 19 files changed, 58 insertions(+)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index bcb62f7333..5d0873e6c9 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -30,8 +30,10 @@ import javax.annotation.PostConstruct;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.dolphinscheduler.api.configuration.PythonGatewayConfiguration;
+import org.apache.dolphinscheduler.api.dto.EnvironmentDto;
 import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.EnvironmentService;
 import org.apache.dolphinscheduler.api.service.ExecutorService;
 import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
 import org.apache.dolphinscheduler.api.service.ProjectService;
@@ -98,6 +100,9 @@ public class PythonGateway {
     @Autowired
     private TenantService tenantService;
 
+    @Autowired
+    private EnvironmentService environmentService;
+
     @Autowired
     private ExecutorService executorService;
 
@@ -562,6 +567,25 @@ public class PythonGateway {
         return result;
     }
 
+    /**
+     * Get environment info by given environment name. It return environment code.
+     * Useful in Python API create task which need environment information.
+     *
+     * @param environmentName name of the environment
+     */
+    public Long getEnvironmentInfo(String environmentName) {
+        Map<String, Object> result = environmentService.queryEnvironmentByName(environmentName);
+
+        if (result.get("data") == null) {
+            String msg = String.format("Can not find valid environment by name %s", environmentName);
+            logger.error(msg);
+            throw new IllegalArgumentException(msg);
+        }
+        EnvironmentDto environmentDto = EnvironmentDto.class.cast(result.get("data"));
+        return environmentDto.getCode();
+    }
+
+
     /**
      * Get resource by given resource type and full name. It return map contain resource id, name.
      * Useful in Python API create task which need processDefinition information.
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index 93c5f28342..d1a2eae9dd 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -90,6 +90,7 @@ class Task(Base):
         "flag",
         "task_priority",
         "worker_group",
+        "environment_code",
         "delay_time",
         "fail_retry_times",
         "fail_retry_interval",
@@ -110,6 +111,7 @@ class Task(Base):
         flag: Optional[str] = TaskFlag.YES,
         task_priority: Optional[str] = TaskPriority.MEDIUM,
         worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
+        environment_name: Optional[str] = None,
         delay_time: Optional[int] = 0,
         fail_retry_times: Optional[int] = 0,
         fail_retry_interval: Optional[int] = 1,
@@ -129,6 +131,7 @@ class Task(Base):
         self.flag = flag
         self.task_priority = task_priority
         self.worker_group = worker_group
+        self._environment_name = environment_name
         self.fail_retry_times = fail_retry_times
         self.fail_retry_interval = fail_retry_interval
         self.delay_time = delay_time
@@ -145,6 +148,7 @@ class Task(Base):
         # move attribute code and version after _process_definition and process_definition declare
         self.code, self.version = self.gen_code_and_version()
         # Add task to process definition, maybe we could put into property process_definition latter
+
         if (
             self.process_definition is not None
             and self.code not in self.process_definition.tasks
@@ -306,3 +310,10 @@ class Task(Base):
         # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
         # gateway_result_checker(result)
         return result.get("code"), result.get("version")
+
+    @property
+    def environment_code(self) -> str:
+        """Convert environment name to code."""
+        if self._environment_name is None:
+            return None
+        return JavaGate().query_environment_info(self._environment_name)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
index 59d411b62c..0ff74ba655 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
@@ -98,6 +98,10 @@ class JavaGate:
         """Get resources file info through java gateway."""
         return self.java_gateway.entry_point.queryResourcesFileInfo(user_name, name)
 
+    def query_environment_info(self, name: str):
+        """Get environment info through java gateway."""
+        return self.java_gateway.entry_point.getEnvironmentInfo(name)
+
     def get_code_and_version(
         self, project_name: str, process_definition_name: str, task_name: str
     ):
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py
index e36c47ba1b..ba44fad669 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py
@@ -124,6 +124,7 @@ def test_property_task_params(mock_resource, mock_code_version, attr, expect):
                 "flag": "YES",
                 "taskPriority": "MEDIUM",
                 "workerGroup": "default",
+                "environmentCode": None,
                 "failRetryTimes": 0,
                 "failRetryInterval": 1,
                 "timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
index 201bdb30cd..3909077b98 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
@@ -161,6 +161,7 @@ def test_task_get_define():
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
index 523264034a..72eec28ed7 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
@@ -381,6 +381,7 @@ def test_condition_get_define(mock_condition_code_version, mock_task_code_versio
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
index 9473f57321..5d1890e83d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
@@ -67,6 +67,7 @@ def test_datax_get_define(mock_datasource):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
@@ -108,6 +109,7 @@ def test_custom_datax_get_define(json_template):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py
index f16e291c82..f55700e04b 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py
@@ -782,6 +782,7 @@ def test_dependent_get_define(mock_code_version, mock_dep_code):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
index 92ae3ba91f..2f30a494b9 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
@@ -68,6 +68,7 @@ def test_flink_get_define(mock_resource):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py
index 060cdec0b0..399829b68c 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py
@@ -130,6 +130,7 @@ def test_http_get_define():
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py
index dbe9e513f5..5d38e93aa4 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py
@@ -61,6 +61,7 @@ def test_mr_get_define(mock_resource):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py
index 1782593955..80afe7b879 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py
@@ -96,6 +96,7 @@ def test_sql_get_define(mock_datasource, mock_code_version):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
index 1cdd85d2cb..e8f7f10d77 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
@@ -132,6 +132,7 @@ def test_python_get_define(name, script_code, raw):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py
index 8838eaf497..20edc22805 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py
@@ -87,6 +87,7 @@ def test_sagemaker_get_define():
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
index e42f6dc0fb..e2c87d8e7d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
@@ -62,6 +62,7 @@ def test_shell_get_define():
         "name": name,
         "version": 1,
         "description": None,
+        "environmentCode": None,
         "delayTime": 0,
         "taskType": "SHELL",
         "taskParams": {
@@ -75,6 +76,7 @@ def test_shell_get_define():
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
@@ -86,4 +88,5 @@ def test_shell_get_define():
         return_value=(code, version),
     ):
         shell = Shell(name, command)
+        print(shell.get_define())
         assert shell.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py
index 3b0672f963..ed83f9f953 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py
@@ -68,6 +68,7 @@ def test_spark_get_define(mock_resource):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
index 50ccd946a9..ba9daa9b2d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
@@ -152,6 +152,7 @@ def test_sql_get_define(mock_datasource):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py
index 7f471a1b8b..126ab1015e 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py
@@ -99,6 +99,7 @@ def test_sub_process_get_define(mock_process_definition):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py
index 1f6ff5bfa2..3206b12f7e 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py
@@ -250,6 +250,7 @@ def test_switch_get_define(mock_task_code_version):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",