You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/01/04 01:46:02 UTC

[dolphinscheduler] branch dev updated: [Feature-7347] [Python]Add workflow as code task type flink (#7632)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 081adf4  [Feature-7347] [Python]Add workflow as code task type flink (#7632)
081adf4 is described below

commit 081adf4aaa7a11a4068acf123985fb5550138ec9
Author: Devosend <de...@gmail.com>
AuthorDate: Tue Jan 4 09:45:54 2022 +0800

    [Feature-7347] [Python]Add workflow as code task type flink (#7632)
    
    * add Flink task for code as flow
    
    Co-authored-by: Jiajie Zhong <zh...@gmail.com>
---
 .../examples/task_flink_example.py                 |  31 ++++++
 .../src/pydolphinscheduler/constants.py            |   1 +
 .../src/pydolphinscheduler/tasks/flink.py          | 117 +++++++++++++++++++++
 .../pydolphinscheduler/tests/tasks/test_flink.py   |  82 +++++++++++++++
 .../server/PythonGatewayServer.java                |  38 ++++++-
 5 files changed, 267 insertions(+), 2 deletions(-)

diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_flink_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_flink_example.py
new file mode 100644
index 0000000..1ef259e
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_flink_example.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""A example workflow for task flink."""
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.flink import DeployMode, Flink, ProgramType
+
+with ProcessDefinition(name="task_flink_example", tenant="tenant_exists") as pd:
+    task = Flink(
+        name="task_flink",
+        main_class="org.apache.flink.streaming.examples.wordcount.WordCount",
+        main_package="WordCount.jar",
+        program_type=ProgramType.JAVA,
+        deploy_mode=DeployMode.LOCAL,
+    )
+    pd.run()
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index 84becbc..4619c91 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -77,6 +77,7 @@ class TaskType(str):
     DEPENDENT = "DEPENDENT"
     CONDITIONS = "CONDITIONS"
     SWITCH = "SWITCH"
+    FLINK = "FLINK"
 
 
 class DefaultTaskCodeNum(str):
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py
new file mode 100644
index 0000000..f732a15
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py
@@ -0,0 +1,117 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task Flink."""
+
+from typing import Dict, Optional
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.java_gateway import launch_gateway
+
+
+class ProgramType(str):
+    """Type of program flink runs, for now it just contain `JAVA`, `SCALA` and `PYTHON`."""
+
+    JAVA = "JAVA"
+    SCALA = "SCALA"
+    PYTHON = "PYTHON"
+
+
+class FlinkVersion(str):
+    """Flink version, for now it just contain `HIGHT` and `LOW`."""
+
+    LOW_VERSION = "<1.10"
+    HIGHT_VERSION = ">=1.10"
+
+
+class DeployMode(str):
+    """Flink deploy mode, for now it just contain `LOCAL` and `CLUSTER`."""
+
+    LOCAL = "local"
+    CLUSTER = "cluster"
+
+
+class Flink(Task):
+    """Task flink object, declare behavior for flink task to dolphinscheduler."""
+
+    _task_custom_attr = {
+        "main_class",
+        "main_jar",
+        "deploy_mode",
+        "flink_version",
+        "slot",
+        "task_manager",
+        "job_manager_memory",
+        "task_manager_memory",
+        "app_name",
+        "program_type",
+        "parallelism",
+        "main_args",
+        "others",
+    }
+
+    def __init__(
+        self,
+        name: str,
+        main_class: str,
+        main_package: str,
+        program_type: Optional[ProgramType] = ProgramType.SCALA,
+        deploy_mode: Optional[DeployMode] = DeployMode.CLUSTER,
+        flink_version: Optional[FlinkVersion] = FlinkVersion.LOW_VERSION,
+        app_name: Optional[str] = None,
+        job_manager_memory: Optional[str] = "1G",
+        task_manager_memory: Optional[str] = "2G",
+        slot: Optional[int] = 1,
+        task_manager: Optional[int] = 2,
+        parallelism: Optional[int] = 1,
+        main_args: Optional[str] = None,
+        others: Optional[str] = None,
+        *args,
+        **kwargs
+    ):
+        super().__init__(name, TaskType.FLINK, *args, **kwargs)
+        self.main_class = main_class
+        self.main_package = main_package
+        self.program_type = program_type
+        self.deploy_mode = deploy_mode
+        self.flink_version = flink_version
+        self.app_name = app_name
+        self.job_manager_memory = job_manager_memory
+        self.task_manager_memory = task_manager_memory
+        self.slot = slot
+        self.task_manager = task_manager
+        self.parallelism = parallelism
+        self.main_args = main_args
+        self.others = others
+        self._resource = {}
+
+    @property
+    def main_jar(self) -> Dict:
+        """Return main package of dict."""
+        resource_info = self.get_resource_info(self.program_type, self.main_package)
+        return {"id": resource_info.get("id")}
+
+    def get_resource_info(self, program_type, main_package) -> Dict:
+        """Get resource info from java gateway, contains resource id, name."""
+        if not self._resource:
+            self._resource = launch_gateway().entry_point.getResourcesFileInfo(
+                program_type,
+                main_package,
+            )
+
+        return self._resource
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
new file mode 100644
index 0000000..743bdae
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task Flink."""
+
+from unittest.mock import patch
+
+from pydolphinscheduler.tasks.flink import DeployMode, Flink, FlinkVersion, ProgramType
+
+
+@patch(
+    "pydolphinscheduler.tasks.flink.Flink.get_resource_info",
+    return_value=({"id": 1, "name": "test"}),
+)
+def test_flink_get_define(mock_resource):
+    """Test task flink function get_define."""
+    code = 123
+    version = 1
+    name = "test_flink_get_define"
+    main_class = "org.apache.flink.test_main_class"
+    main_package = "test_main_package"
+    program_type = ProgramType.JAVA
+    deploy_mode = DeployMode.LOCAL
+
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": "FLINK",
+        "taskParams": {
+            "mainClass": main_class,
+            "mainJar": {
+                "id": 1,
+            },
+            "programType": program_type,
+            "deployMode": deploy_mode,
+            "flinkVersion": FlinkVersion.LOW_VERSION,
+            "slot": 1,
+            "parallelism": 1,
+            "taskManager": 2,
+            "jobManagerMemory": "1G",
+            "taskManagerMemory": "2G",
+            "appName": None,
+            "mainArgs": None,
+            "others": None,
+            "localParams": [],
+            "resourceList": [],
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        task = Flink(name, main_class, main_package, program_type, deploy_mode)
+        assert task.get_define() == expect
diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
index bc50852..ef6deb4 100644
--- a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
+++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
@@ -17,11 +17,13 @@
 
 package org.apache.dolphinscheduler.server;
 
+import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.ExecutorService;
 import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
 import org.apache.dolphinscheduler.api.service.ProjectService;
 import org.apache.dolphinscheduler.api.service.QueueService;
+import org.apache.dolphinscheduler.api.service.ResourcesService;
 import org.apache.dolphinscheduler.api.service.SchedulerService;
 import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
 import org.apache.dolphinscheduler.api.service.TenantService;
@@ -31,6 +33,7 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
+import org.apache.dolphinscheduler.common.enums.ProgramType;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.enums.RunMode;
 import org.apache.dolphinscheduler.common.enums.TaskDependType;
@@ -50,11 +53,13 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 import javax.annotation.PostConstruct;
 
@@ -68,10 +73,12 @@ import org.springframework.context.annotation.ComponentScan;
 
 import py4j.GatewayServer;
 
+import org.apache.commons.collections.CollectionUtils;
+
 @SpringBootApplication
 @ComponentScan(value = "org.apache.dolphinscheduler")
 public class PythonGatewayServer extends SpringBootServletInitializer {
-    private static final Logger LOGGER = LoggerFactory.getLogger(PythonGatewayServer.class);
+    private static final Logger logger = LoggerFactory.getLogger(PythonGatewayServer.class);
 
     private static final WarningType DEFAULT_WARNING_TYPE = WarningType.NONE;
     private static final int DEFAULT_WARNING_GROUP_ID = 0;
@@ -108,6 +115,9 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
     private QueueService queueService;
 
     @Autowired
+    private ResourcesService resourceService;
+
+    @Autowired
     private ProjectMapper projectMapper;
 
     @Autowired
@@ -244,7 +254,7 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
             processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName);
         } else if (verifyStatus != Status.SUCCESS) {
             String msg = "Verify process definition exists status is invalid, neither SUCCESS or PROCESS_DEFINITION_NAME_EXIST.";
-            LOGGER.error(msg);
+            logger.error(msg);
             throw new RuntimeException(msg);
         }
 
@@ -465,6 +475,30 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
         return result;
     }
 
+    /**
+     * Get resource by given program type and full name. It return map contain resource id, name.
+     * Useful in Python API create flink task which need processDefinition information.
+     *
+     * @param programType program type one of SCALA, JAVA and PYTHON
+     * @param fullName    full name of the resource
+     */
+    public Map<String, Object> getResourcesFileInfo(String programType, String fullName) {
+        Map<String, Object> result = new HashMap<>();
+
+        Map<String, Object> resources = resourceService.queryResourceByProgramType(dummyAdminUser, ResourceType.FILE, ProgramType.valueOf(programType));
+        List<ResourceComponent> resourcesComponent = (List<ResourceComponent>) resources.get(Constants.DATA_LIST);
+        List<ResourceComponent> namedResources = resourcesComponent.stream().filter(s -> fullName.equals(s.getFullName())).collect(Collectors.toList());
+        if (CollectionUtils.isEmpty(namedResources)) {
+            String msg = String.format("Can not find valid resource by program type %s and name %s", programType, fullName);
+            logger.error(msg);
+            throw new IllegalArgumentException(msg);
+        }
+
+        result.put("id", namedResources.get(0).getId());
+        result.put("name", namedResources.get(0).getName());
+        return result;
+    }
+
     @PostConstruct
     public void run() {
         GatewayServer server = new GatewayServer(this);