You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/01/04 01:46:02 UTC
[dolphinscheduler] branch dev updated: [Feature-7347] [Python]Add workflow as code task type flink (#7632)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 081adf4 [Feature-7347] [Python]Add workflow as code task type flink (#7632)
081adf4 is described below
commit 081adf4aaa7a11a4068acf123985fb5550138ec9
Author: Devosend <de...@gmail.com>
AuthorDate: Tue Jan 4 09:45:54 2022 +0800
[Feature-7347] [Python]Add workflow as code task type flink (#7632)
* add Flink task for code as flow
Co-authored-by: Jiajie Zhong <zh...@gmail.com>
---
.../examples/task_flink_example.py | 31 ++++++
.../src/pydolphinscheduler/constants.py | 1 +
.../src/pydolphinscheduler/tasks/flink.py | 117 +++++++++++++++++++++
.../pydolphinscheduler/tests/tasks/test_flink.py | 82 +++++++++++++++
.../server/PythonGatewayServer.java | 38 ++++++-
5 files changed, 267 insertions(+), 2 deletions(-)
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_flink_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_flink_example.py
new file mode 100644
index 0000000..1ef259e
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_flink_example.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""A example workflow for task flink."""
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.flink import DeployMode, Flink, ProgramType
+
+with ProcessDefinition(name="task_flink_example", tenant="tenant_exists") as pd:
+ task = Flink(
+ name="task_flink",
+ main_class="org.apache.flink.streaming.examples.wordcount.WordCount",
+ main_package="WordCount.jar",
+ program_type=ProgramType.JAVA,
+ deploy_mode=DeployMode.LOCAL,
+ )
+ pd.run()
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index 84becbc..4619c91 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -77,6 +77,7 @@ class TaskType(str):
DEPENDENT = "DEPENDENT"
CONDITIONS = "CONDITIONS"
SWITCH = "SWITCH"
+ FLINK = "FLINK"
class DefaultTaskCodeNum(str):
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py
new file mode 100644
index 0000000..f732a15
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py
@@ -0,0 +1,117 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task Flink."""
+
+from typing import Dict, Optional
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.java_gateway import launch_gateway
+
+
+class ProgramType(str):
+ """Type of program flink runs, for now it just contain `JAVA`, `SCALA` and `PYTHON`."""
+
+ JAVA = "JAVA"
+ SCALA = "SCALA"
+ PYTHON = "PYTHON"
+
+
+class FlinkVersion(str):
+ """Flink version, for now it just contain `HIGHT` and `LOW`."""
+
+ LOW_VERSION = "<1.10"
+ HIGHT_VERSION = ">=1.10"
+
+
+class DeployMode(str):
+ """Flink deploy mode, for now it just contain `LOCAL` and `CLUSTER`."""
+
+ LOCAL = "local"
+ CLUSTER = "cluster"
+
+
+class Flink(Task):
+ """Task flink object, declare behavior for flink task to dolphinscheduler."""
+
+ _task_custom_attr = {
+ "main_class",
+ "main_jar",
+ "deploy_mode",
+ "flink_version",
+ "slot",
+ "task_manager",
+ "job_manager_memory",
+ "task_manager_memory",
+ "app_name",
+ "program_type",
+ "parallelism",
+ "main_args",
+ "others",
+ }
+
+ def __init__(
+ self,
+ name: str,
+ main_class: str,
+ main_package: str,
+ program_type: Optional[ProgramType] = ProgramType.SCALA,
+ deploy_mode: Optional[DeployMode] = DeployMode.CLUSTER,
+ flink_version: Optional[FlinkVersion] = FlinkVersion.LOW_VERSION,
+ app_name: Optional[str] = None,
+ job_manager_memory: Optional[str] = "1G",
+ task_manager_memory: Optional[str] = "2G",
+ slot: Optional[int] = 1,
+ task_manager: Optional[int] = 2,
+ parallelism: Optional[int] = 1,
+ main_args: Optional[str] = None,
+ others: Optional[str] = None,
+ *args,
+ **kwargs
+ ):
+ super().__init__(name, TaskType.FLINK, *args, **kwargs)
+ self.main_class = main_class
+ self.main_package = main_package
+ self.program_type = program_type
+ self.deploy_mode = deploy_mode
+ self.flink_version = flink_version
+ self.app_name = app_name
+ self.job_manager_memory = job_manager_memory
+ self.task_manager_memory = task_manager_memory
+ self.slot = slot
+ self.task_manager = task_manager
+ self.parallelism = parallelism
+ self.main_args = main_args
+ self.others = others
+ self._resource = {}
+
+ @property
+ def main_jar(self) -> Dict:
+ """Return main package of dict."""
+ resource_info = self.get_resource_info(self.program_type, self.main_package)
+ return {"id": resource_info.get("id")}
+
+ def get_resource_info(self, program_type, main_package) -> Dict:
+ """Get resource info from java gateway, contains resource id, name."""
+ if not self._resource:
+ self._resource = launch_gateway().entry_point.getResourcesFileInfo(
+ program_type,
+ main_package,
+ )
+
+ return self._resource
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
new file mode 100644
index 0000000..743bdae
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task Flink."""
+
+from unittest.mock import patch
+
+from pydolphinscheduler.tasks.flink import DeployMode, Flink, FlinkVersion, ProgramType
+
+
+@patch(
+ "pydolphinscheduler.tasks.flink.Flink.get_resource_info",
+ return_value=({"id": 1, "name": "test"}),
+)
+def test_flink_get_define(mock_resource):
+ """Test task flink function get_define."""
+ code = 123
+ version = 1
+ name = "test_flink_get_define"
+ main_class = "org.apache.flink.test_main_class"
+ main_package = "test_main_package"
+ program_type = ProgramType.JAVA
+ deploy_mode = DeployMode.LOCAL
+
+ expect = {
+ "code": code,
+ "name": name,
+ "version": 1,
+ "description": None,
+ "delayTime": 0,
+ "taskType": "FLINK",
+ "taskParams": {
+ "mainClass": main_class,
+ "mainJar": {
+ "id": 1,
+ },
+ "programType": program_type,
+ "deployMode": deploy_mode,
+ "flinkVersion": FlinkVersion.LOW_VERSION,
+ "slot": 1,
+ "parallelism": 1,
+ "taskManager": 2,
+ "jobManagerMemory": "1G",
+ "taskManagerMemory": "2G",
+ "appName": None,
+ "mainArgs": None,
+ "others": None,
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
+ },
+ "flag": "YES",
+ "taskPriority": "MEDIUM",
+ "workerGroup": "default",
+ "failRetryTimes": 0,
+ "failRetryInterval": 1,
+ "timeoutFlag": "CLOSE",
+ "timeoutNotifyStrategy": None,
+ "timeout": 0,
+ }
+ with patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(code, version),
+ ):
+ task = Flink(name, main_class, main_package, program_type, deploy_mode)
+ assert task.get_define() == expect
diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
index bc50852..ef6deb4 100644
--- a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
+++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
@@ -17,11 +17,13 @@
package org.apache.dolphinscheduler.server;
+import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.QueueService;
+import org.apache.dolphinscheduler.api.service.ResourcesService;
import org.apache.dolphinscheduler.api.service.SchedulerService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.service.TenantService;
@@ -31,6 +33,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
+import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
@@ -50,11 +53,13 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.spi.enums.ResourceType;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
@@ -68,10 +73,12 @@ import org.springframework.context.annotation.ComponentScan;
import py4j.GatewayServer;
+import org.apache.commons.collections.CollectionUtils;
+
@SpringBootApplication
@ComponentScan(value = "org.apache.dolphinscheduler")
public class PythonGatewayServer extends SpringBootServletInitializer {
- private static final Logger LOGGER = LoggerFactory.getLogger(PythonGatewayServer.class);
+ private static final Logger logger = LoggerFactory.getLogger(PythonGatewayServer.class);
private static final WarningType DEFAULT_WARNING_TYPE = WarningType.NONE;
private static final int DEFAULT_WARNING_GROUP_ID = 0;
@@ -108,6 +115,9 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
private QueueService queueService;
@Autowired
+ private ResourcesService resourceService;
+
+ @Autowired
private ProjectMapper projectMapper;
@Autowired
@@ -244,7 +254,7 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName);
} else if (verifyStatus != Status.SUCCESS) {
String msg = "Verify process definition exists status is invalid, neither SUCCESS or PROCESS_DEFINITION_NAME_EXIST.";
- LOGGER.error(msg);
+ logger.error(msg);
throw new RuntimeException(msg);
}
@@ -465,6 +475,30 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
return result;
}
+ /**
+ * Get resource by given program type and full name. It return map contain resource id, name.
+ * Useful in Python API create flink task which need processDefinition information.
+ *
+ * @param programType program type one of SCALA, JAVA and PYTHON
+ * @param fullName full name of the resource
+ */
+ public Map<String, Object> getResourcesFileInfo(String programType, String fullName) {
+ Map<String, Object> result = new HashMap<>();
+
+ Map<String, Object> resources = resourceService.queryResourceByProgramType(dummyAdminUser, ResourceType.FILE, ProgramType.valueOf(programType));
+ List<ResourceComponent> resourcesComponent = (List<ResourceComponent>) resources.get(Constants.DATA_LIST);
+ List<ResourceComponent> namedResources = resourcesComponent.stream().filter(s -> fullName.equals(s.getFullName())).collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(namedResources)) {
+ String msg = String.format("Can not find valid resource by program type %s and name %s", programType, fullName);
+ logger.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+
+ result.put("id", namedResources.get(0).getId());
+ result.put("name", namedResources.get(0).getName());
+ return result;
+ }
+
@PostConstruct
public void run() {
GatewayServer server = new GatewayServer(this);