You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/09/18 08:50:53 UTC
[dolphinscheduler] 01/05: [feat][python] Support Pytorch task in python api (#11975)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 6f3b4c162476ec27ae185e33241521032acb3db8
Author: JieguangZhou <ji...@163.com>
AuthorDate: Fri Sep 16 20:24:38 2022 +0800
[feat][python] Support Pytorch task in python api (#11975)
Co-authored-by: Jiajie Zhong <zh...@gmail.com>
(cherry picked from commit 5202e5cfc6a487a7e0897f52821d9bcf73e4cf82)
---
.../pydolphinscheduler/docs/source/tasks/index.rst | 1 +
.../docs/source/tasks/{index.rst => pytorch.rst} | 40 ++++---
.../examples/yaml_define/Pytorch.yaml | 53 +++++++++
.../src/pydolphinscheduler/constants.py | 1 +
.../examples/task_pytorch_example.py | 62 +++++++++++
.../src/pydolphinscheduler/tasks/__init__.py | 2 +
.../src/pydolphinscheduler/tasks/pytorch.py | 95 ++++++++++++++++
.../pydolphinscheduler/tests/tasks/test_pytorch.py | 124 +++++++++++++++++++++
8 files changed, 357 insertions(+), 21 deletions(-)
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
index 30173f838b..a13652a526 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
@@ -42,3 +42,4 @@ In this section
sub_process
sagemaker
+ pytorch
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/pytorch.rst
similarity index 61%
copy from dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
copy to dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/pytorch.rst
index 30173f838b..4c7a5521fb 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/pytorch.rst
@@ -15,30 +15,28 @@
specific language governing permissions and limitations
under the License.
-Tasks
-=====
+Pytorch
+=======
-In this section
-.. toctree::
- :maxdepth: 1
-
- func_wrap
- shell
- sql
- python
- http
+A Pytorch task type's example and dive into information of **PyDolphinScheduler**.
- switch
- condition
- dependent
+Example
+-------
- spark
- flink
- map_reduce
- procedure
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_pytorch_example.py
+ :start-after: [start workflow_declare]
+ :end-before: [end workflow_declare]
- datax
- sub_process
+Dive Into
+---------
- sagemaker
+.. automodule:: pydolphinscheduler.tasks.pytorch
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Pytorch.yaml
+ :start-after: # under the License.
+ :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Pytorch.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Pytorch.yaml
new file mode 100644
index 0000000000..8706824245
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Pytorch.yaml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+ name: "Pytorch"
+
+# Define the tasks under the workflow
+tasks:
+
+ # run project with existing environment
+ - name: task_existing_env
+ task_type: pytorch
+ script: main.py
+ script_params: --dry-run --no-cuda
+ project_path: https://github.com/pytorch/examples#mnist
+ python_command: /home/anaconda3/envs/pytorch/bin/python3
+
+
+ # run project with creating conda environment
+ - name: task_conda_env
+ task_type: pytorch
+ script: main.py
+ script_params: --dry-run --no-cuda
+ project_path: https://github.com/pytorch/examples#mnist
+ is_create_environment: True
+ python_env_tool: conda
+ requirements: requirements.txt
+ conda_python_version: 3.7
+
+ # run project with creating virtualenv environment
+ - name: task_virtualenv_env
+ task_type: pytorch
+ script: main.py
+ script_params: --dry-run --no-cuda
+ project_path: https://github.com/pytorch/examples#mnist
+ is_create_environment: True
+ python_env_tool: virtualenv
+ requirements: requirements.txt
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index 4544a6989d..7eb5d04210 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -58,6 +58,7 @@ class TaskType(str):
SPARK = "SPARK"
MR = "MR"
SAGEMAKER = "SAGEMAKER"
+ PYTORCH = "PYTORCH"
class DefaultTaskCodeNum(str):
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_pytorch_example.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_pytorch_example.py
new file mode 100644
index 0000000000..6559c9ac65
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_pytorch_example.py
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [start workflow_declare]
+"""A example workflow for task pytorch."""
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.pytorch import Pytorch
+
+with ProcessDefinition(
+ name="task_pytorch_example",
+ tenant="tenant_exists",
+) as pd:
+
+ # run project with existing environment
+ task_existing_env = Pytorch(
+ name="task_existing_env",
+ script="main.py",
+ script_params="--dry-run --no-cuda",
+ project_path="https://github.com/pytorch/examples#mnist",
+ python_command="/home/anaconda3/envs/pytorch/bin/python3",
+ )
+
+ # run project with creating conda environment
+ task_conda_env = Pytorch(
+ name="task_conda_env",
+ script="main.py",
+ script_params="--dry-run --no-cuda",
+ project_path="https://github.com/pytorch/examples#mnist",
+ is_create_environment=True,
+ python_env_tool="conda",
+ requirements="requirements.txt",
+ conda_python_version="3.7",
+ )
+
+ # run project with creating virtualenv environment
+ task_virtualenv_env = Pytorch(
+ name="task_virtualenv_env",
+ script="main.py",
+ script_params="--dry-run --no-cuda",
+ project_path="https://github.com/pytorch/examples#mnist",
+ is_create_environment=True,
+ python_env_tool="virtualenv",
+ requirements="requirements.txt",
+ )
+
+ pd.submit()
+# [end workflow_declare]
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
index 53b462ca90..1481722433 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
@@ -25,6 +25,7 @@ from pydolphinscheduler.tasks.http import Http
from pydolphinscheduler.tasks.map_reduce import MR
from pydolphinscheduler.tasks.procedure import Procedure
from pydolphinscheduler.tasks.python import Python
+from pydolphinscheduler.tasks.pytorch import Pytorch
from pydolphinscheduler.tasks.sagemaker import SageMaker
from pydolphinscheduler.tasks.shell import Shell
from pydolphinscheduler.tasks.spark import Spark
@@ -42,6 +43,7 @@ __all__ = [
"MR",
"Procedure",
"Python",
+ "Pytorch",
"Shell",
"Spark",
"Sql",
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/pytorch.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/pytorch.py
new file mode 100644
index 0000000000..4767f7ecee
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/pytorch.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task Pytorch."""
+from typing import Optional
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task
+
+
+class DEFAULT:
+ """Default values for Pytorch."""
+
+ is_create_environment = False
+ project_path = "."
+ python_command = "${PYTHON_HOME}"
+
+
+class Pytorch(Task):
+ """Task Pytorch object, declare behavior for Pytorch task to dolphinscheduler.
+
+ See also: `DolphinScheduler Pytorch Task Plugin
+ <https://dolphinscheduler.apache.org/en-us/docs/dev/user_doc/guide/task/pytorch.html>`_
+
+ :param name: task name
+ :param script: Entry to the Python script file that you want to run.
+ :param script_params: Input parameters at run time.
+ :param project_path: The path to the project. Default "." .
+ :param is_create_environment: is create environment. Default False.
+ :param python_command: The path to the python command. Default "${PYTHON_HOME}".
+ :param python_env_tool: The python environment tool. Default "conda".
+ :param requirements: The path to the requirements.txt file. Default "requirements.txt".
+ :param conda_python_version: The python version of conda environment. Default "3.7".
+ """
+
+ _task_custom_attr = {
+ "script",
+ "script_params",
+ "other_params",
+ "python_path",
+ "is_create_environment",
+ "python_command",
+ "python_env_tool",
+ "requirements",
+ "conda_python_version",
+ }
+
+ def __init__(
+ self,
+ name: str,
+ script: str,
+ script_params: str = "",
+ project_path: Optional[str] = DEFAULT.project_path,
+ is_create_environment: Optional[bool] = DEFAULT.is_create_environment,
+ python_command: Optional[str] = DEFAULT.python_command,
+ python_env_tool: Optional[str] = "conda",
+ requirements: Optional[str] = "requirements.txt",
+ conda_python_version: Optional[str] = "3.7",
+ *args,
+ **kwargs,
+ ):
+ """Init Pytorch task."""
+ super().__init__(name, TaskType.PYTORCH, *args, **kwargs)
+ self.script = script
+ self.script_params = script_params
+ self.is_create_environment = is_create_environment
+ self.python_path = project_path
+ self.python_command = python_command
+ self.python_env_tool = python_env_tool
+ self.requirements = requirements
+ self.conda_python_version = conda_python_version
+
+ @property
+ def other_params(self):
+ """Return other params."""
+ conds = [
+ self.is_create_environment != DEFAULT.is_create_environment,
+ self.python_path != DEFAULT.project_path,
+ self.python_command != DEFAULT.python_command,
+ ]
+ return any(conds)
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_pytorch.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_pytorch.py
new file mode 100644
index 0000000000..eccb51ca31
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_pytorch.py
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task Pytorch."""
+from copy import deepcopy
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.tasks.pytorch import DEFAULT, Pytorch
+from tests.testing.task import Task
+
+CODE = 123
+VERSION = 1
+
+EXPECT = {
+ "code": CODE,
+ "version": VERSION,
+ "description": None,
+ "delayTime": 0,
+ "taskType": "PYTORCH",
+ "taskParams": {
+ "resourceList": [],
+ "localParams": [],
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
+ },
+ "flag": "YES",
+ "taskPriority": "MEDIUM",
+ "workerGroup": "default",
+ "environmentCode": None,
+ "failRetryTimes": 0,
+ "failRetryInterval": 1,
+ "timeoutFlag": "CLOSE",
+ "timeoutNotifyStrategy": None,
+ "timeout": 0,
+}
+
+
+def test_pytorch_get_define():
+ """Test task pytorch function get_define."""
+ name = "task_conda_env"
+ script = "main.py"
+ script_params = "--dry-run --no-cuda"
+ project_path = "https://github.com/pytorch/examples#mnist"
+ is_create_environment = True
+ python_env_tool = "conda"
+ requirements = "requirements.txt"
+ conda_python_version = "3.7"
+
+ expect = deepcopy(EXPECT)
+ expect["name"] = name
+ task_params = expect["taskParams"]
+
+ task_params["script"] = script
+ task_params["scriptParams"] = script_params
+ task_params["pythonPath"] = project_path
+ task_params["otherParams"] = True
+ task_params["isCreateEnvironment"] = is_create_environment
+ task_params["pythonCommand"] = "${PYTHON_HOME}"
+ task_params["pythonEnvTool"] = python_env_tool
+ task_params["requirements"] = requirements
+ task_params["condaPythonVersion"] = conda_python_version
+
+ with patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(CODE, VERSION),
+ ):
+ task = Pytorch(
+ name=name,
+ script=script,
+ script_params=script_params,
+ project_path=project_path,
+ is_create_environment=is_create_environment,
+ python_env_tool=python_env_tool,
+ requirements=requirements,
+ )
+ assert task.get_define() == expect
+
+
+@pytest.mark.parametrize(
+ "is_create_environment, project_path, python_command, expect",
+ [
+ (
+ DEFAULT.is_create_environment,
+ DEFAULT.project_path,
+ DEFAULT.python_command,
+ False,
+ ),
+ (True, DEFAULT.project_path, DEFAULT.python_command, True),
+ (DEFAULT.is_create_environment, "/home", DEFAULT.python_command, True),
+ (DEFAULT.is_create_environment, DEFAULT.project_path, "/usr/bin/python", True),
+ ],
+)
+def test_other_params(is_create_environment, project_path, python_command, expect):
+ """Test task pytorch function other_params."""
+ with patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
+ ):
+ task = Pytorch(
+ name="test",
+ script="",
+ script_params="",
+ project_path=project_path,
+ is_create_environment=is_create_environment,
+ python_command=python_command,
+ )
+ assert task.other_params == expect