You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/01/15 15:25:04 UTC

[dolphinscheduler] branch dev updated: [Feature-7346] [Python]Add workflow as code task type spark (#7968)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 027af09  [Feature-7346] [Python]Add workflow as code task type spark (#7968)
027af09 is described below

commit 027af091c736a801df3082ea5323d6821a7d64dd
Author: Devosend <de...@gmail.com>
AuthorDate: Sat Jan 15 23:24:55 2022 +0800

    [Feature-7346] [Python]Add workflow as code task type spark (#7968)
    
    * add spark task
    
    * fix code format
    
    * add parent class for flink and spark
    
    * modify Engine docstring
    
    * modify docstring of Engine
---
 .../examples/task_spark_example.py                 |  31 +++++
 .../src/pydolphinscheduler/constants.py            |   1 +
 .../src/pydolphinscheduler/core/engine.py          |  95 +++++++++++++
 .../src/pydolphinscheduler/tasks/flink.py          |  48 ++-----
 .../src/pydolphinscheduler/tasks/spark.py          |  94 +++++++++++++
 .../pydolphinscheduler/tests/core/test_engine.py   | 147 +++++++++++++++++++++
 .../pydolphinscheduler/tests/tasks/test_datax.py   |   2 -
 .../pydolphinscheduler/tests/tasks/test_flink.py   |   2 +-
 .../tests/tasks/{test_flink.py => test_spark.py}   |  30 ++---
 .../server/PythonGatewayServer.java                |   2 +-
 10 files changed, 397 insertions(+), 55 deletions(-)

diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_spark_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_spark_example.py
new file mode 100644
index 0000000..6cace77
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_spark_example.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""A example workflow for task spark."""
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.spark import DeployMode, ProgramType, Spark
+
+with ProcessDefinition(name="task_spark_example", tenant="tenant_exists") as pd:
+    task = Spark(
+        name="task_spark",
+        main_class="org.apache.spark.examples.SparkPi",
+        main_package="spark-examples_2.12-3.2.0.jar",
+        program_type=ProgramType.JAVA,
+        deploy_mode=DeployMode.LOCAL,
+    )
+    pd.run()
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index 4619c91..7f63a82 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -78,6 +78,7 @@ class TaskType(str):
     CONDITIONS = "CONDITIONS"
     SWITCH = "SWITCH"
     FLINK = "FLINK"
+    SPARK = "SPARK"
 
 
 class DefaultTaskCodeNum(str):
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py
new file mode 100644
index 0000000..df84b5b
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Module engine."""
+
+from typing import Dict, Optional
+
+from py4j.protocol import Py4JJavaError
+
+from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.java_gateway import launch_gateway
+
+
+class ProgramType(str):
+    """Type of program engine runs, for now it just contain `JAVA`, `SCALA` and `PYTHON`."""
+
+    JAVA = "JAVA"
+    SCALA = "SCALA"
+    PYTHON = "PYTHON"
+
+
+class Engine(Task):
+    """Task engine object, declare behavior for engine task to dolphinscheduler.
+
+    This is the parent class of spark, flink and mr tasks,
+    and is used to provide the programType, mainClass and mainJar task parameters for reuse.
+    """
+
+    def __init__(
+        self,
+        name: str,
+        task_type: str,
+        main_class: str,
+        main_package: str,
+        program_type: Optional[ProgramType] = ProgramType.SCALA,
+        *args,
+        **kwargs
+    ):
+        super().__init__(name, task_type, *args, **kwargs)
+        self.main_class = main_class
+        self.main_package = main_package
+        self.program_type = program_type
+        self._resource = {}
+
+    def get_resource_info(self, program_type, main_package):
+        """Get resource info from java gateway, contains resource id, name."""
+        if self._resource:
+            return self._resource
+        else:
+            gateway = launch_gateway()
+            try:
+                self._resource = gateway.entry_point.getResourcesFileInfo(
+                    program_type, main_package
+                )
+            # Handler source do not exists error, for now we just terminate the process.
+            except Py4JJavaError as ex:
+                raise PyDSParamException(str(ex.java_exception))
+            return self._resource
+
+    def get_jar_id(self) -> int:
+        """Get jar id from java gateway, a wrapper for :func:`get_resource_info`."""
+        return self.get_resource_info(self.program_type, self.main_package).get("id")
+
+    @property
+    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
+        """Override Task.task_params for engine children task.
+
+        children task have some specials attribute for task_params, and is odd if we
+        directly set as python property, so we Override Task.task_params here.
+        """
+        params = super().task_params
+        custom_params = {
+            "programType": self.program_type,
+            "mainClass": self.main_class,
+            "mainJar": {
+                "id": self.get_jar_id(),
+            },
+        }
+        params.update(custom_params)
+        return params
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py
index f732a15..83cae95 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py
@@ -17,19 +17,10 @@
 
 """Task Flink."""
 
-from typing import Dict, Optional
+from typing import Optional
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
-from pydolphinscheduler.java_gateway import launch_gateway
-
-
-class ProgramType(str):
-    """Type of program flink runs, for now it just contain `JAVA`, `SCALA` and `PYTHON`."""
-
-    JAVA = "JAVA"
-    SCALA = "SCALA"
-    PYTHON = "PYTHON"
+from pydolphinscheduler.core.engine import Engine, ProgramType
 
 
 class FlinkVersion(str):
@@ -46,12 +37,10 @@ class DeployMode(str):
     CLUSTER = "cluster"
 
 
-class Flink(Task):
+class Flink(Engine):
     """Task flink object, declare behavior for flink task to dolphinscheduler."""
 
     _task_custom_attr = {
-        "main_class",
-        "main_jar",
         "deploy_mode",
         "flink_version",
         "slot",
@@ -59,7 +48,6 @@ class Flink(Task):
         "job_manager_memory",
         "task_manager_memory",
         "app_name",
-        "program_type",
         "parallelism",
         "main_args",
         "others",
@@ -84,10 +72,15 @@ class Flink(Task):
         *args,
         **kwargs
     ):
-        super().__init__(name, TaskType.FLINK, *args, **kwargs)
-        self.main_class = main_class
-        self.main_package = main_package
-        self.program_type = program_type
+        super().__init__(
+            name,
+            TaskType.FLINK,
+            main_class,
+            main_package,
+            program_type,
+            *args,
+            **kwargs
+        )
         self.deploy_mode = deploy_mode
         self.flink_version = flink_version
         self.app_name = app_name
@@ -98,20 +91,3 @@ class Flink(Task):
         self.parallelism = parallelism
         self.main_args = main_args
         self.others = others
-        self._resource = {}
-
-    @property
-    def main_jar(self) -> Dict:
-        """Return main package of dict."""
-        resource_info = self.get_resource_info(self.program_type, self.main_package)
-        return {"id": resource_info.get("id")}
-
-    def get_resource_info(self, program_type, main_package) -> Dict:
-        """Get resource info from java gateway, contains resource id, name."""
-        if not self._resource:
-            self._resource = launch_gateway().entry_point.getResourcesFileInfo(
-                program_type,
-                main_package,
-            )
-
-        return self._resource
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/spark.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/spark.py
new file mode 100644
index 0000000..565daad
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/spark.py
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task Spark."""
+
+from typing import Optional
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.engine import Engine, ProgramType
+
+
+class SparkVersion(str):
+    """Spark version, for now it just contain `SPARK1` and `SPARK2`."""
+
+    SPARK1 = "SPARK1"
+    SPARK2 = "SPARK2"
+
+
+class DeployMode(str):
+    """SPARK deploy mode, for now it just contain `LOCAL`, `CLIENT` and `CLUSTER`."""
+
+    LOCAL = "local"
+    CLIENT = "client"
+    CLUSTER = "cluster"
+
+
+class Spark(Engine):
+    """Task spark object, declare behavior for spark task to dolphinscheduler."""
+
+    _task_custom_attr = {
+        "deploy_mode",
+        "spark_version",
+        "driver_cores",
+        "driver_memory",
+        "num_executors",
+        "executor_memory",
+        "executor_cores",
+        "app_name",
+        "main_args",
+        "others",
+    }
+
+    def __init__(
+        self,
+        name: str,
+        main_class: str,
+        main_package: str,
+        program_type: Optional[ProgramType] = ProgramType.SCALA,
+        deploy_mode: Optional[DeployMode] = DeployMode.CLUSTER,
+        spark_version: Optional[SparkVersion] = SparkVersion.SPARK2,
+        app_name: Optional[str] = None,
+        driver_cores: Optional[int] = 1,
+        driver_memory: Optional[str] = "512M",
+        num_executors: Optional[int] = 2,
+        executor_memory: Optional[str] = "2G",
+        executor_cores: Optional[int] = 2,
+        main_args: Optional[str] = None,
+        others: Optional[str] = None,
+        *args,
+        **kwargs
+    ):
+        super().__init__(
+            name,
+            TaskType.SPARK,
+            main_class,
+            main_package,
+            program_type,
+            *args,
+            **kwargs
+        )
+        self.deploy_mode = deploy_mode
+        self.spark_version = spark_version
+        self.app_name = app_name
+        self.driver_cores = driver_cores
+        self.driver_memory = driver_memory
+        self.num_executors = num_executors
+        self.executor_memory = executor_memory
+        self.executor_cores = executor_cores
+        self.main_args = main_args
+        self.others = others
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py
new file mode 100644
index 0000000..e36c47b
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py
@@ -0,0 +1,147 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task Engine."""
+
+
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.core.engine import Engine, ProgramType
+
+TEST_ENGINE_TASK_TYPE = "ENGINE"
+TEST_MAIN_CLASS = "org.apache.examples.mock.Mock"
+TEST_MAIN_PACKAGE = "Mock.jar"
+TEST_PROGRAM_TYPE = ProgramType.JAVA
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.core.engine.Engine.get_resource_info",
+    return_value=({"id": 1, "name": "mock_name"}),
+)
+def test_get_jar_detail(mock_resource, mock_code_version):
+    """Test :func:`get_jar_id` can return expect value."""
+    name = "test_get_jar_detail"
+    task = Engine(
+        name,
+        TEST_ENGINE_TASK_TYPE,
+        TEST_MAIN_CLASS,
+        TEST_MAIN_PACKAGE,
+        TEST_PROGRAM_TYPE,
+    )
+    assert 1 == task.get_jar_id()
+
+
+@pytest.mark.parametrize(
+    "attr, expect",
+    [
+        (
+            {
+                "name": "test-task-params",
+                "task_type": "test-engine",
+                "main_class": "org.apache.examples.mock.Mock",
+                "main_package": "TestMock.jar",
+                "program_type": ProgramType.JAVA,
+            },
+            {
+                "mainClass": "org.apache.examples.mock.Mock",
+                "mainJar": {
+                    "id": 1,
+                },
+                "programType": ProgramType.JAVA,
+                "localParams": [],
+                "resourceList": [],
+                "dependence": {},
+                "conditionResult": {"successNode": [""], "failedNode": [""]},
+                "waitStartTimeout": {},
+            },
+        )
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.core.engine.Engine.get_resource_info",
+    return_value=({"id": 1, "name": "mock_name"}),
+)
+def test_property_task_params(mock_resource, mock_code_version, attr, expect):
+    """Test task engine task property."""
+    task = Engine(**attr)
+    assert expect == task.task_params
+
+
+@pytest.mark.parametrize(
+    "attr, expect",
+    [
+        (
+            {
+                "name": "test-task-test_engine_get_define",
+                "task_type": "test-engine",
+                "main_class": "org.apache.examples.mock.Mock",
+                "main_package": "TestMock.jar",
+                "program_type": ProgramType.JAVA,
+            },
+            {
+                "code": 123,
+                "name": "test-task-test_engine_get_define",
+                "version": 1,
+                "description": None,
+                "delayTime": 0,
+                "taskType": "test-engine",
+                "taskParams": {
+                    "mainClass": "org.apache.examples.mock.Mock",
+                    "mainJar": {
+                        "id": 1,
+                    },
+                    "programType": ProgramType.JAVA,
+                    "localParams": [],
+                    "resourceList": [],
+                    "dependence": {},
+                    "conditionResult": {"successNode": [""], "failedNode": [""]},
+                    "waitStartTimeout": {},
+                },
+                "flag": "YES",
+                "taskPriority": "MEDIUM",
+                "workerGroup": "default",
+                "failRetryTimes": 0,
+                "failRetryInterval": 1,
+                "timeoutFlag": "CLOSE",
+                "timeoutNotifyStrategy": None,
+                "timeout": 0,
+            },
+        )
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.core.engine.Engine.get_resource_info",
+    return_value=({"id": 1, "name": "mock_name"}),
+)
+def test_engine_get_define(mock_resource, mock_code_version, attr, expect):
+    """Test task engine function get_define."""
+    task = Engine(**attr)
+    assert task.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
index 7fa4569..9473f57 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
@@ -119,6 +119,4 @@ def test_custom_datax_get_define(json_template):
         return_value=(code, version),
     ):
         task = CustomDataX(name, json_template)
-        print(task.get_define())
-        print(expect)
         assert task.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
index 743bdae..92ae3ba 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
@@ -23,7 +23,7 @@ from pydolphinscheduler.tasks.flink import DeployMode, Flink, FlinkVersion, Prog
 
 
 @patch(
-    "pydolphinscheduler.tasks.flink.Flink.get_resource_info",
+    "pydolphinscheduler.core.engine.Engine.get_resource_info",
     return_value=({"id": 1, "name": "test"}),
 )
 def test_flink_get_define(mock_resource):
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py
similarity index 76%
copy from dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
copy to dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py
index 743bdae..3b0672f 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py
@@ -15,23 +15,23 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test Task Flink."""
+"""Test Task Spark."""
 
 from unittest.mock import patch
 
-from pydolphinscheduler.tasks.flink import DeployMode, Flink, FlinkVersion, ProgramType
+from pydolphinscheduler.tasks.spark import DeployMode, ProgramType, Spark, SparkVersion
 
 
 @patch(
-    "pydolphinscheduler.tasks.flink.Flink.get_resource_info",
+    "pydolphinscheduler.core.engine.Engine.get_resource_info",
     return_value=({"id": 1, "name": "test"}),
 )
-def test_flink_get_define(mock_resource):
-    """Test task flink function get_define."""
+def test_spark_get_define(mock_resource):
+    """Test task spark function get_define."""
     code = 123
     version = 1
-    name = "test_flink_get_define"
-    main_class = "org.apache.flink.test_main_class"
+    name = "test_spark_get_define"
+    main_class = "org.apache.spark.test_main_class"
     main_package = "test_main_package"
     program_type = ProgramType.JAVA
     deploy_mode = DeployMode.LOCAL
@@ -42,7 +42,7 @@ def test_flink_get_define(mock_resource):
         "version": 1,
         "description": None,
         "delayTime": 0,
-        "taskType": "FLINK",
+        "taskType": "SPARK",
         "taskParams": {
             "mainClass": main_class,
             "mainJar": {
@@ -50,12 +50,12 @@ def test_flink_get_define(mock_resource):
             },
             "programType": program_type,
             "deployMode": deploy_mode,
-            "flinkVersion": FlinkVersion.LOW_VERSION,
-            "slot": 1,
-            "parallelism": 1,
-            "taskManager": 2,
-            "jobManagerMemory": "1G",
-            "taskManagerMemory": "2G",
+            "sparkVersion": SparkVersion.SPARK2,
+            "driverCores": 1,
+            "driverMemory": "512M",
+            "numExecutors": 2,
+            "executorMemory": "2G",
+            "executorCores": 2,
             "appName": None,
             "mainArgs": None,
             "others": None,
@@ -78,5 +78,5 @@ def test_flink_get_define(mock_resource):
         "pydolphinscheduler.core.task.Task.gen_code_and_version",
         return_value=(code, version),
     ):
-        task = Flink(name, main_class, main_package, program_type, deploy_mode)
+        task = Spark(name, main_class, main_package, program_type, deploy_mode)
         assert task.get_define() == expect
diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
index 9489894..2b06286 100644
--- a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
+++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
@@ -477,7 +477,7 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
 
     /**
      * Get resource by given program type and full name. It return map contain resource id, name.
-     * Useful in Python API create flink task which need processDefinition information.
+     * Useful in Python API create flink or spark task which need processDefinition information.
      *
      * @param programType program type one of SCALA, JAVA and PYTHON
      * @param fullName    full name of the resource