You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/01/15 15:25:04 UTC
[dolphinscheduler] branch dev updated: [Feature-7346] [Python]Add workflow as code task type spark (#7968)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 027af09 [Feature-7346] [Python]Add workflow as code task type spark (#7968)
027af09 is described below
commit 027af091c736a801df3082ea5323d6821a7d64dd
Author: Devosend <de...@gmail.com>
AuthorDate: Sat Jan 15 23:24:55 2022 +0800
[Feature-7346] [Python]Add workflow as code task type spark (#7968)
* add spark task
* fix code format
* add parent class for flink and spark
* modify Engine docstring
* modify docstring of Engine
---
.../examples/task_spark_example.py | 31 +++++
.../src/pydolphinscheduler/constants.py | 1 +
.../src/pydolphinscheduler/core/engine.py | 95 +++++++++++++
.../src/pydolphinscheduler/tasks/flink.py | 48 ++-----
.../src/pydolphinscheduler/tasks/spark.py | 94 +++++++++++++
.../pydolphinscheduler/tests/core/test_engine.py | 147 +++++++++++++++++++++
.../pydolphinscheduler/tests/tasks/test_datax.py | 2 -
.../pydolphinscheduler/tests/tasks/test_flink.py | 2 +-
.../tests/tasks/{test_flink.py => test_spark.py} | 30 ++---
.../server/PythonGatewayServer.java | 2 +-
10 files changed, 397 insertions(+), 55 deletions(-)
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_spark_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_spark_example.py
new file mode 100644
index 0000000..6cace77
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_spark_example.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""A example workflow for task spark."""
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.spark import DeployMode, ProgramType, Spark
+
+with ProcessDefinition(name="task_spark_example", tenant="tenant_exists") as pd:
+ task = Spark(
+ name="task_spark",
+ main_class="org.apache.spark.examples.SparkPi",
+ main_package="spark-examples_2.12-3.2.0.jar",
+ program_type=ProgramType.JAVA,
+ deploy_mode=DeployMode.LOCAL,
+ )
+ pd.run()
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index 4619c91..7f63a82 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -78,6 +78,7 @@ class TaskType(str):
CONDITIONS = "CONDITIONS"
SWITCH = "SWITCH"
FLINK = "FLINK"
+ SPARK = "SPARK"
class DefaultTaskCodeNum(str):
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py
new file mode 100644
index 0000000..df84b5b
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Module engine."""
+
+from typing import Dict, Optional
+
+from py4j.protocol import Py4JJavaError
+
+from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.java_gateway import launch_gateway
+
+
+class ProgramType(str):
+ """Type of program engine runs, for now it just contain `JAVA`, `SCALA` and `PYTHON`."""
+
+ JAVA = "JAVA"
+ SCALA = "SCALA"
+ PYTHON = "PYTHON"
+
+
+class Engine(Task):
+ """Task engine object, declare behavior for engine task to dolphinscheduler.
+
+ This is the parent class of spark, flink and mr tasks,
+ and is used to provide the programType, mainClass and mainJar task parameters for reuse.
+ """
+
+ def __init__(
+ self,
+ name: str,
+ task_type: str,
+ main_class: str,
+ main_package: str,
+ program_type: Optional[ProgramType] = ProgramType.SCALA,
+ *args,
+ **kwargs
+ ):
+ super().__init__(name, task_type, *args, **kwargs)
+ self.main_class = main_class
+ self.main_package = main_package
+ self.program_type = program_type
+ self._resource = {}
+
+ def get_resource_info(self, program_type, main_package):
+ """Get resource info from java gateway, contains resource id, name."""
+ if self._resource:
+ return self._resource
+ else:
+ gateway = launch_gateway()
+ try:
+ self._resource = gateway.entry_point.getResourcesFileInfo(
+ program_type, main_package
+ )
+ # Handler source do not exists error, for now we just terminate the process.
+ except Py4JJavaError as ex:
+ raise PyDSParamException(str(ex.java_exception))
+ return self._resource
+
+ def get_jar_id(self) -> int:
+ """Get jar id from java gateway, a wrapper for :func:`get_resource_info`."""
+ return self.get_resource_info(self.program_type, self.main_package).get("id")
+
+ @property
+ def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
+ """Override Task.task_params for engine children task.
+
+ children task have some specials attribute for task_params, and is odd if we
+ directly set as python property, so we Override Task.task_params here.
+ """
+ params = super().task_params
+ custom_params = {
+ "programType": self.program_type,
+ "mainClass": self.main_class,
+ "mainJar": {
+ "id": self.get_jar_id(),
+ },
+ }
+ params.update(custom_params)
+ return params
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py
index f732a15..83cae95 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py
@@ -17,19 +17,10 @@
"""Task Flink."""
-from typing import Dict, Optional
+from typing import Optional
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
-from pydolphinscheduler.java_gateway import launch_gateway
-
-
-class ProgramType(str):
- """Type of program flink runs, for now it just contain `JAVA`, `SCALA` and `PYTHON`."""
-
- JAVA = "JAVA"
- SCALA = "SCALA"
- PYTHON = "PYTHON"
+from pydolphinscheduler.core.engine import Engine, ProgramType
class FlinkVersion(str):
@@ -46,12 +37,10 @@ class DeployMode(str):
CLUSTER = "cluster"
-class Flink(Task):
+class Flink(Engine):
"""Task flink object, declare behavior for flink task to dolphinscheduler."""
_task_custom_attr = {
- "main_class",
- "main_jar",
"deploy_mode",
"flink_version",
"slot",
@@ -59,7 +48,6 @@ class Flink(Task):
"job_manager_memory",
"task_manager_memory",
"app_name",
- "program_type",
"parallelism",
"main_args",
"others",
@@ -84,10 +72,15 @@ class Flink(Task):
*args,
**kwargs
):
- super().__init__(name, TaskType.FLINK, *args, **kwargs)
- self.main_class = main_class
- self.main_package = main_package
- self.program_type = program_type
+ super().__init__(
+ name,
+ TaskType.FLINK,
+ main_class,
+ main_package,
+ program_type,
+ *args,
+ **kwargs
+ )
self.deploy_mode = deploy_mode
self.flink_version = flink_version
self.app_name = app_name
@@ -98,20 +91,3 @@ class Flink(Task):
self.parallelism = parallelism
self.main_args = main_args
self.others = others
- self._resource = {}
-
- @property
- def main_jar(self) -> Dict:
- """Return main package of dict."""
- resource_info = self.get_resource_info(self.program_type, self.main_package)
- return {"id": resource_info.get("id")}
-
- def get_resource_info(self, program_type, main_package) -> Dict:
- """Get resource info from java gateway, contains resource id, name."""
- if not self._resource:
- self._resource = launch_gateway().entry_point.getResourcesFileInfo(
- program_type,
- main_package,
- )
-
- return self._resource
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/spark.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/spark.py
new file mode 100644
index 0000000..565daad
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/spark.py
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task Spark."""
+
+from typing import Optional
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.engine import Engine, ProgramType
+
+
+class SparkVersion(str):
+ """Spark version, for now it just contain `SPARK1` and `SPARK2`."""
+
+ SPARK1 = "SPARK1"
+ SPARK2 = "SPARK2"
+
+
+class DeployMode(str):
+ """SPARK deploy mode, for now it just contain `LOCAL`, `CLIENT` and `CLUSTER`."""
+
+ LOCAL = "local"
+ CLIENT = "client"
+ CLUSTER = "cluster"
+
+
+class Spark(Engine):
+ """Task spark object, declare behavior for spark task to dolphinscheduler."""
+
+ _task_custom_attr = {
+ "deploy_mode",
+ "spark_version",
+ "driver_cores",
+ "driver_memory",
+ "num_executors",
+ "executor_memory",
+ "executor_cores",
+ "app_name",
+ "main_args",
+ "others",
+ }
+
+ def __init__(
+ self,
+ name: str,
+ main_class: str,
+ main_package: str,
+ program_type: Optional[ProgramType] = ProgramType.SCALA,
+ deploy_mode: Optional[DeployMode] = DeployMode.CLUSTER,
+ spark_version: Optional[SparkVersion] = SparkVersion.SPARK2,
+ app_name: Optional[str] = None,
+ driver_cores: Optional[int] = 1,
+ driver_memory: Optional[str] = "512M",
+ num_executors: Optional[int] = 2,
+ executor_memory: Optional[str] = "2G",
+ executor_cores: Optional[int] = 2,
+ main_args: Optional[str] = None,
+ others: Optional[str] = None,
+ *args,
+ **kwargs
+ ):
+ super().__init__(
+ name,
+ TaskType.SPARK,
+ main_class,
+ main_package,
+ program_type,
+ *args,
+ **kwargs
+ )
+ self.deploy_mode = deploy_mode
+ self.spark_version = spark_version
+ self.app_name = app_name
+ self.driver_cores = driver_cores
+ self.driver_memory = driver_memory
+ self.num_executors = num_executors
+ self.executor_memory = executor_memory
+ self.executor_cores = executor_cores
+ self.main_args = main_args
+ self.others = others
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py
new file mode 100644
index 0000000..e36c47b
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py
@@ -0,0 +1,147 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task Engine."""
+
+
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.core.engine import Engine, ProgramType
+
+TEST_ENGINE_TASK_TYPE = "ENGINE"
+TEST_MAIN_CLASS = "org.apache.examples.mock.Mock"
+TEST_MAIN_PACKAGE = "Mock.jar"
+TEST_PROGRAM_TYPE = ProgramType.JAVA
+
+
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+@patch(
+ "pydolphinscheduler.core.engine.Engine.get_resource_info",
+ return_value=({"id": 1, "name": "mock_name"}),
+)
+def test_get_jar_detail(mock_resource, mock_code_version):
+ """Test :func:`get_jar_id` can return expect value."""
+ name = "test_get_jar_detail"
+ task = Engine(
+ name,
+ TEST_ENGINE_TASK_TYPE,
+ TEST_MAIN_CLASS,
+ TEST_MAIN_PACKAGE,
+ TEST_PROGRAM_TYPE,
+ )
+ assert 1 == task.get_jar_id()
+
+
+@pytest.mark.parametrize(
+ "attr, expect",
+ [
+ (
+ {
+ "name": "test-task-params",
+ "task_type": "test-engine",
+ "main_class": "org.apache.examples.mock.Mock",
+ "main_package": "TestMock.jar",
+ "program_type": ProgramType.JAVA,
+ },
+ {
+ "mainClass": "org.apache.examples.mock.Mock",
+ "mainJar": {
+ "id": 1,
+ },
+ "programType": ProgramType.JAVA,
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
+ },
+ )
+ ],
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+@patch(
+ "pydolphinscheduler.core.engine.Engine.get_resource_info",
+ return_value=({"id": 1, "name": "mock_name"}),
+)
+def test_property_task_params(mock_resource, mock_code_version, attr, expect):
+ """Test task engine task property."""
+ task = Engine(**attr)
+ assert expect == task.task_params
+
+
+@pytest.mark.parametrize(
+ "attr, expect",
+ [
+ (
+ {
+ "name": "test-task-test_engine_get_define",
+ "task_type": "test-engine",
+ "main_class": "org.apache.examples.mock.Mock",
+ "main_package": "TestMock.jar",
+ "program_type": ProgramType.JAVA,
+ },
+ {
+ "code": 123,
+ "name": "test-task-test_engine_get_define",
+ "version": 1,
+ "description": None,
+ "delayTime": 0,
+ "taskType": "test-engine",
+ "taskParams": {
+ "mainClass": "org.apache.examples.mock.Mock",
+ "mainJar": {
+ "id": 1,
+ },
+ "programType": ProgramType.JAVA,
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
+ },
+ "flag": "YES",
+ "taskPriority": "MEDIUM",
+ "workerGroup": "default",
+ "failRetryTimes": 0,
+ "failRetryInterval": 1,
+ "timeoutFlag": "CLOSE",
+ "timeoutNotifyStrategy": None,
+ "timeout": 0,
+ },
+ )
+ ],
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+@patch(
+ "pydolphinscheduler.core.engine.Engine.get_resource_info",
+ return_value=({"id": 1, "name": "mock_name"}),
+)
+def test_engine_get_define(mock_resource, mock_code_version, attr, expect):
+ """Test task engine function get_define."""
+ task = Engine(**attr)
+ assert task.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
index 7fa4569..9473f57 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
@@ -119,6 +119,4 @@ def test_custom_datax_get_define(json_template):
return_value=(code, version),
):
task = CustomDataX(name, json_template)
- print(task.get_define())
- print(expect)
assert task.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
index 743bdae..92ae3ba 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
@@ -23,7 +23,7 @@ from pydolphinscheduler.tasks.flink import DeployMode, Flink, FlinkVersion, Prog
@patch(
- "pydolphinscheduler.tasks.flink.Flink.get_resource_info",
+ "pydolphinscheduler.core.engine.Engine.get_resource_info",
return_value=({"id": 1, "name": "test"}),
)
def test_flink_get_define(mock_resource):
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py
similarity index 76%
copy from dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
copy to dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py
index 743bdae..3b0672f 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py
@@ -15,23 +15,23 @@
# specific language governing permissions and limitations
# under the License.
-"""Test Task Flink."""
+"""Test Task Spark."""
from unittest.mock import patch
-from pydolphinscheduler.tasks.flink import DeployMode, Flink, FlinkVersion, ProgramType
+from pydolphinscheduler.tasks.spark import DeployMode, ProgramType, Spark, SparkVersion
@patch(
- "pydolphinscheduler.tasks.flink.Flink.get_resource_info",
+ "pydolphinscheduler.core.engine.Engine.get_resource_info",
return_value=({"id": 1, "name": "test"}),
)
-def test_flink_get_define(mock_resource):
- """Test task flink function get_define."""
+def test_spark_get_define(mock_resource):
+ """Test task spark function get_define."""
code = 123
version = 1
- name = "test_flink_get_define"
- main_class = "org.apache.flink.test_main_class"
+ name = "test_spark_get_define"
+ main_class = "org.apache.spark.test_main_class"
main_package = "test_main_package"
program_type = ProgramType.JAVA
deploy_mode = DeployMode.LOCAL
@@ -42,7 +42,7 @@ def test_flink_get_define(mock_resource):
"version": 1,
"description": None,
"delayTime": 0,
- "taskType": "FLINK",
+ "taskType": "SPARK",
"taskParams": {
"mainClass": main_class,
"mainJar": {
@@ -50,12 +50,12 @@ def test_flink_get_define(mock_resource):
},
"programType": program_type,
"deployMode": deploy_mode,
- "flinkVersion": FlinkVersion.LOW_VERSION,
- "slot": 1,
- "parallelism": 1,
- "taskManager": 2,
- "jobManagerMemory": "1G",
- "taskManagerMemory": "2G",
+ "sparkVersion": SparkVersion.SPARK2,
+ "driverCores": 1,
+ "driverMemory": "512M",
+ "numExecutors": 2,
+ "executorMemory": "2G",
+ "executorCores": 2,
"appName": None,
"mainArgs": None,
"others": None,
@@ -78,5 +78,5 @@ def test_flink_get_define(mock_resource):
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version),
):
- task = Flink(name, main_class, main_package, program_type, deploy_mode)
+ task = Spark(name, main_class, main_package, program_type, deploy_mode)
assert task.get_define() == expect
diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
index 9489894..2b06286 100644
--- a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
+++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
@@ -477,7 +477,7 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
/**
* Get resource by given program type and full name. It return map contain resource id, name.
- * Useful in Python API create flink task which need processDefinition information.
+ * Useful in Python API create flink or spark task which need processDefinition information.
*
* @param programType program type one of SCALA, JAVA and PYTHON
* @param fullName full name of the resource