You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/01/21 01:54:43 UTC

[dolphinscheduler] branch dev updated: [Feature-7348] [Python] Add workflow as code task type mr (#8140)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7aeac75  [Feature-7348] [Python] Add workflow as code task type mr (#8140)
7aeac75 is described below

commit 7aeac75860d5d80c4da0e40b26723acb7cd02f81
Author: Devosend <de...@gmail.com>
AuthorDate: Fri Jan 21 09:54:36 2022 +0800

    [Feature-7348] [Python] Add workflow as code task type mr (#8140)
---
 .../examples/task_map_reduce_example.py            | 32 +++++++++
 .../src/pydolphinscheduler/constants.py            |  1 +
 .../src/pydolphinscheduler/tasks/map_reduce.py     | 52 +++++++++++++++
 .../tests/tasks/test_map_reduce.py                 | 75 ++++++++++++++++++++++
 4 files changed, 160 insertions(+)

diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_map_reduce_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_map_reduce_example.py
new file mode 100644
index 0000000..75e09a9
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_map_reduce_example.py
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""A example workflow for task mr."""
+
+from pydolphinscheduler.core.engine import ProgramType
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.map_reduce import MR
+
+with ProcessDefinition(name="task_map_reduce_example", tenant="tenant_exists") as pd:
+    task = MR(
+        name="task_mr",
+        main_class="wordcount",
+        main_package="hadoop-mapreduce-examples-3.3.1.jar",
+        program_type=ProgramType.JAVA,
+        main_args="/dolphinscheduler/tenant_exists/resources/file.txt /output/ds",
+    )
+    pd.run()
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index 7f63a82..7bd71b9 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -79,6 +79,7 @@ class TaskType(str):
     SWITCH = "SWITCH"
     FLINK = "FLINK"
     SPARK = "SPARK"
+    MR = "MR"
 
 
 class DefaultTaskCodeNum(str):
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/map_reduce.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/map_reduce.py
new file mode 100644
index 0000000..5050bd3
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/map_reduce.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task MR."""
+
+from typing import Optional
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.engine import Engine, ProgramType
+
+
+class MR(Engine):
+    """Task mr object, declare behavior for mr task to dolphinscheduler."""
+
+    _task_custom_attr = {
+        "app_name",
+        "main_args",
+        "others",
+    }
+
+    def __init__(
+        self,
+        name: str,
+        main_class: str,
+        main_package: str,
+        program_type: Optional[ProgramType] = ProgramType.SCALA,
+        app_name: Optional[str] = None,
+        main_args: Optional[str] = None,
+        others: Optional[str] = None,
+        *args,
+        **kwargs
+    ):
+        super().__init__(
+            name, TaskType.MR, main_class, main_package, program_type, *args, **kwargs
+        )
+        self.app_name = app_name
+        self.main_args = main_args
+        self.others = others
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py
new file mode 100644
index 0000000..dbe9e51
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task MR."""
+
+from unittest.mock import patch
+
+from pydolphinscheduler.tasks.map_reduce import MR, ProgramType
+
+
+@patch(
+    "pydolphinscheduler.core.engine.Engine.get_resource_info",
+    return_value=({"id": 1, "name": "test"}),
+)
+def test_mr_get_define(mock_resource):
+    """Test task mr function get_define."""
+    code = 123
+    version = 1
+    name = "test_mr_get_define"
+    main_class = "org.apache.mr.test_main_class"
+    main_package = "test_main_package"
+    program_type = ProgramType.JAVA
+    main_args = "/dolphinscheduler/resources/file.txt /output/ds"
+
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": "MR",
+        "taskParams": {
+            "mainClass": main_class,
+            "mainJar": {
+                "id": 1,
+            },
+            "programType": program_type,
+            "appName": None,
+            "mainArgs": main_args,
+            "others": None,
+            "localParams": [],
+            "resourceList": [],
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        task = MR(name, main_class, main_package, program_type, main_args=main_args)
+        assert task.get_define() == expect