You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/01/21 01:54:43 UTC
[dolphinscheduler] branch dev updated: [Feature-7348] [Python] Add workflow as code task type mr (#8140)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 7aeac75 [Feature-7348] [Python] Add workflow as code task type mr (#8140)
7aeac75 is described below
commit 7aeac75860d5d80c4da0e40b26723acb7cd02f81
Author: Devosend <de...@gmail.com>
AuthorDate: Fri Jan 21 09:54:36 2022 +0800
[Feature-7348] [Python] Add workflow as code task type mr (#8140)
---
.../examples/task_map_reduce_example.py | 32 +++++++++
.../src/pydolphinscheduler/constants.py | 1 +
.../src/pydolphinscheduler/tasks/map_reduce.py | 52 +++++++++++++++
.../tests/tasks/test_map_reduce.py | 75 ++++++++++++++++++++++
4 files changed, 160 insertions(+)
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_map_reduce_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_map_reduce_example.py
new file mode 100644
index 0000000..75e09a9
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_map_reduce_example.py
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""A example workflow for task mr."""
+
+from pydolphinscheduler.core.engine import ProgramType
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.map_reduce import MR
+
+with ProcessDefinition(name="task_map_reduce_example", tenant="tenant_exists") as pd:
+ task = MR(
+ name="task_mr",
+ main_class="wordcount",
+ main_package="hadoop-mapreduce-examples-3.3.1.jar",
+ program_type=ProgramType.JAVA,
+ main_args="/dolphinscheduler/tenant_exists/resources/file.txt /output/ds",
+ )
+ pd.run()
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index 7f63a82..7bd71b9 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -79,6 +79,7 @@ class TaskType(str):
SWITCH = "SWITCH"
FLINK = "FLINK"
SPARK = "SPARK"
+ MR = "MR"
class DefaultTaskCodeNum(str):
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/map_reduce.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/map_reduce.py
new file mode 100644
index 0000000..5050bd3
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/map_reduce.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task MR."""
+
+from typing import Optional
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.engine import Engine, ProgramType
+
+
+class MR(Engine):
+ """Task mr object, declare behavior for mr task to dolphinscheduler."""
+
+ _task_custom_attr = {
+ "app_name",
+ "main_args",
+ "others",
+ }
+
+ def __init__(
+ self,
+ name: str,
+ main_class: str,
+ main_package: str,
+ program_type: Optional[ProgramType] = ProgramType.SCALA,
+ app_name: Optional[str] = None,
+ main_args: Optional[str] = None,
+ others: Optional[str] = None,
+ *args,
+ **kwargs
+ ):
+ super().__init__(
+ name, TaskType.MR, main_class, main_package, program_type, *args, **kwargs
+ )
+ self.app_name = app_name
+ self.main_args = main_args
+ self.others = others
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py
new file mode 100644
index 0000000..dbe9e51
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task MR."""
+
+from unittest.mock import patch
+
+from pydolphinscheduler.tasks.map_reduce import MR, ProgramType
+
+
+@patch(
+ "pydolphinscheduler.core.engine.Engine.get_resource_info",
+ return_value=({"id": 1, "name": "test"}),
+)
+def test_mr_get_define(mock_resource):
+ """Test task mr function get_define."""
+ code = 123
+ version = 1
+ name = "test_mr_get_define"
+ main_class = "org.apache.mr.test_main_class"
+ main_package = "test_main_package"
+ program_type = ProgramType.JAVA
+ main_args = "/dolphinscheduler/resources/file.txt /output/ds"
+
+ expect = {
+ "code": code,
+ "name": name,
+ "version": 1,
+ "description": None,
+ "delayTime": 0,
+ "taskType": "MR",
+ "taskParams": {
+ "mainClass": main_class,
+ "mainJar": {
+ "id": 1,
+ },
+ "programType": program_type,
+ "appName": None,
+ "mainArgs": main_args,
+ "others": None,
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
+ },
+ "flag": "YES",
+ "taskPriority": "MEDIUM",
+ "workerGroup": "default",
+ "failRetryTimes": 0,
+ "failRetryInterval": 1,
+ "timeoutFlag": "CLOSE",
+ "timeoutNotifyStrategy": None,
+ "timeout": 0,
+ }
+ with patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(code, version),
+ ):
+ task = MR(name, main_class, main_package, program_type, main_args=main_args)
+ assert task.get_define() == expect