You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/09/18 08:50:53 UTC

[dolphinscheduler] 01/05: [feat][python] Support Pytorch task in python api (#11975)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 6f3b4c162476ec27ae185e33241521032acb3db8
Author: JieguangZhou <ji...@163.com>
AuthorDate: Fri Sep 16 20:24:38 2022 +0800

    [feat][python] Support Pytorch task in python api (#11975)
    
    Co-authored-by: Jiajie Zhong <zh...@gmail.com>
    (cherry picked from commit 5202e5cfc6a487a7e0897f52821d9bcf73e4cf82)
---
 .../pydolphinscheduler/docs/source/tasks/index.rst |   1 +
 .../docs/source/tasks/{index.rst => pytorch.rst}   |  40 ++++---
 .../examples/yaml_define/Pytorch.yaml              |  53 +++++++++
 .../src/pydolphinscheduler/constants.py            |   1 +
 .../examples/task_pytorch_example.py               |  62 +++++++++++
 .../src/pydolphinscheduler/tasks/__init__.py       |   2 +
 .../src/pydolphinscheduler/tasks/pytorch.py        |  95 ++++++++++++++++
 .../pydolphinscheduler/tests/tasks/test_pytorch.py | 124 +++++++++++++++++++++
 8 files changed, 357 insertions(+), 21 deletions(-)

diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
index 30173f838b..a13652a526 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
@@ -42,3 +42,4 @@ In this section
    sub_process
 
    sagemaker
+   pytorch
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/pytorch.rst
similarity index 61%
copy from dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
copy to dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/pytorch.rst
index 30173f838b..4c7a5521fb 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/pytorch.rst
@@ -15,30 +15,28 @@
    specific language governing permissions and limitations
    under the License.
 
-Tasks
-=====
+Pytorch
+=======
 
-In this section 
 
-.. toctree::
-   :maxdepth: 1
-   
-   func_wrap
-   shell
-   sql
-   python
-   http
+A Pytorch task type's example and dive into information of **PyDolphinScheduler**.
 
-   switch
-   condition
-   dependent
+Example
+-------
 
-   spark
-   flink
-   map_reduce
-   procedure
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_pytorch_example.py
+   :start-after: [start workflow_declare]
+   :end-before: [end workflow_declare]
 
-   datax
-   sub_process
+Dive Into
+---------
 
-   sagemaker
+.. automodule:: pydolphinscheduler.tasks.pytorch
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Pytorch.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Pytorch.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Pytorch.yaml
new file mode 100644
index 0000000000..8706824245
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Pytorch.yaml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "Pytorch"
+
+# Define the tasks under the workflow
+tasks:
+
+  # run project with existing environment
+  - name: task_existing_env
+    task_type: pytorch
+    script: main.py
+    script_params: --dry-run --no-cuda
+    project_path: https://github.com/pytorch/examples#mnist
+    python_command: /home/anaconda3/envs/pytorch/bin/python3
+
+
+  # run project with creating conda environment
+  - name: task_conda_env
+    task_type: pytorch
+    script: main.py
+    script_params: --dry-run --no-cuda
+    project_path: https://github.com/pytorch/examples#mnist
+    is_create_environment: True
+    python_env_tool: conda
+    requirements: requirements.txt
+    conda_python_version: 3.7
+
+  # run project with creating virtualenv environment
+  - name: task_virtualenv_env
+    task_type: pytorch
+    script: main.py
+    script_params: --dry-run --no-cuda
+    project_path: https://github.com/pytorch/examples#mnist
+    is_create_environment: True
+    python_env_tool: virtualenv
+    requirements: requirements.txt
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index 4544a6989d..7eb5d04210 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -58,6 +58,7 @@ class TaskType(str):
     SPARK = "SPARK"
     MR = "MR"
     SAGEMAKER = "SAGEMAKER"
+    PYTORCH = "PYTORCH"
 
 
 class DefaultTaskCodeNum(str):
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_pytorch_example.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_pytorch_example.py
new file mode 100644
index 0000000000..6559c9ac65
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_pytorch_example.py
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [start workflow_declare]
+"""A example workflow for task pytorch."""
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.pytorch import Pytorch
+
+with ProcessDefinition(
+    name="task_pytorch_example",
+    tenant="tenant_exists",
+) as pd:
+
+    # run project with existing environment
+    task_existing_env = Pytorch(
+        name="task_existing_env",
+        script="main.py",
+        script_params="--dry-run --no-cuda",
+        project_path="https://github.com/pytorch/examples#mnist",
+        python_command="/home/anaconda3/envs/pytorch/bin/python3",
+    )
+
+    # run project with creating conda environment
+    task_conda_env = Pytorch(
+        name="task_conda_env",
+        script="main.py",
+        script_params="--dry-run --no-cuda",
+        project_path="https://github.com/pytorch/examples#mnist",
+        is_create_environment=True,
+        python_env_tool="conda",
+        requirements="requirements.txt",
+        conda_python_version="3.7",
+    )
+
+    # run project with creating virtualenv environment
+    task_virtualenv_env = Pytorch(
+        name="task_virtualenv_env",
+        script="main.py",
+        script_params="--dry-run --no-cuda",
+        project_path="https://github.com/pytorch/examples#mnist",
+        is_create_environment=True,
+        python_env_tool="virtualenv",
+        requirements="requirements.txt",
+    )
+
+    pd.submit()
+# [end workflow_declare]
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
index 53b462ca90..1481722433 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
@@ -25,6 +25,7 @@ from pydolphinscheduler.tasks.http import Http
 from pydolphinscheduler.tasks.map_reduce import MR
 from pydolphinscheduler.tasks.procedure import Procedure
 from pydolphinscheduler.tasks.python import Python
+from pydolphinscheduler.tasks.pytorch import Pytorch
 from pydolphinscheduler.tasks.sagemaker import SageMaker
 from pydolphinscheduler.tasks.shell import Shell
 from pydolphinscheduler.tasks.spark import Spark
@@ -42,6 +43,7 @@ __all__ = [
     "MR",
     "Procedure",
     "Python",
+    "Pytorch",
     "Shell",
     "Spark",
     "Sql",
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/pytorch.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/pytorch.py
new file mode 100644
index 0000000000..4767f7ecee
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/pytorch.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task Pytorch."""
+from typing import Optional
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task
+
+
+class DEFAULT:
+    """Default values for Pytorch."""
+
+    is_create_environment = False
+    project_path = "."
+    python_command = "${PYTHON_HOME}"
+
+
+class Pytorch(Task):
+    """Task Pytorch object, declare behavior for Pytorch task to dolphinscheduler.
+
+    See also: `DolphinScheduler Pytorch Task Plugin
+    <https://dolphinscheduler.apache.org/en-us/docs/dev/user_doc/guide/task/pytorch.html>`_
+
+    :param name: task name
+    :param script: Entry to the Python script file that you want to run.
+    :param script_params: Input parameters at run time.
+    :param project_path: The path to the project. Default "." .
+    :param is_create_environment: is create environment. Default False.
+    :param python_command: The path to the python command. Default "${PYTHON_HOME}".
+    :param python_env_tool: The python environment tool. Default "conda".
+    :param requirements: The path to the requirements.txt file. Default "requirements.txt".
+    :param conda_python_version: The python version of conda environment. Default "3.7".
+    """
+
+    _task_custom_attr = {
+        "script",
+        "script_params",
+        "other_params",
+        "python_path",
+        "is_create_environment",
+        "python_command",
+        "python_env_tool",
+        "requirements",
+        "conda_python_version",
+    }
+
+    def __init__(
+        self,
+        name: str,
+        script: str,
+        script_params: str = "",
+        project_path: Optional[str] = DEFAULT.project_path,
+        is_create_environment: Optional[bool] = DEFAULT.is_create_environment,
+        python_command: Optional[str] = DEFAULT.python_command,
+        python_env_tool: Optional[str] = "conda",
+        requirements: Optional[str] = "requirements.txt",
+        conda_python_version: Optional[str] = "3.7",
+        *args,
+        **kwargs,
+    ):
+        """Init Pytorch task."""
+        super().__init__(name, TaskType.PYTORCH, *args, **kwargs)
+        self.script = script
+        self.script_params = script_params
+        self.is_create_environment = is_create_environment
+        self.python_path = project_path
+        self.python_command = python_command
+        self.python_env_tool = python_env_tool
+        self.requirements = requirements
+        self.conda_python_version = conda_python_version
+
+    @property
+    def other_params(self):
+        """Return other params."""
+        conds = [
+            self.is_create_environment != DEFAULT.is_create_environment,
+            self.python_path != DEFAULT.project_path,
+            self.python_command != DEFAULT.python_command,
+        ]
+        return any(conds)
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_pytorch.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_pytorch.py
new file mode 100644
index 0000000000..eccb51ca31
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_pytorch.py
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task Pytorch."""
+from copy import deepcopy
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.tasks.pytorch import DEFAULT, Pytorch
+from tests.testing.task import Task
+
+CODE = 123
+VERSION = 1
+
+EXPECT = {
+    "code": CODE,
+    "version": VERSION,
+    "description": None,
+    "delayTime": 0,
+    "taskType": "PYTORCH",
+    "taskParams": {
+        "resourceList": [],
+        "localParams": [],
+        "dependence": {},
+        "conditionResult": {"successNode": [""], "failedNode": [""]},
+        "waitStartTimeout": {},
+    },
+    "flag": "YES",
+    "taskPriority": "MEDIUM",
+    "workerGroup": "default",
+    "environmentCode": None,
+    "failRetryTimes": 0,
+    "failRetryInterval": 1,
+    "timeoutFlag": "CLOSE",
+    "timeoutNotifyStrategy": None,
+    "timeout": 0,
+}
+
+
+def test_pytorch_get_define():
+    """Test task pytorch function get_define."""
+    name = "task_conda_env"
+    script = "main.py"
+    script_params = "--dry-run --no-cuda"
+    project_path = "https://github.com/pytorch/examples#mnist"
+    is_create_environment = True
+    python_env_tool = "conda"
+    requirements = "requirements.txt"
+    conda_python_version = "3.7"
+
+    expect = deepcopy(EXPECT)
+    expect["name"] = name
+    task_params = expect["taskParams"]
+
+    task_params["script"] = script
+    task_params["scriptParams"] = script_params
+    task_params["pythonPath"] = project_path
+    task_params["otherParams"] = True
+    task_params["isCreateEnvironment"] = is_create_environment
+    task_params["pythonCommand"] = "${PYTHON_HOME}"
+    task_params["pythonEnvTool"] = python_env_tool
+    task_params["requirements"] = requirements
+    task_params["condaPythonVersion"] = conda_python_version
+
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(CODE, VERSION),
+    ):
+        task = Pytorch(
+            name=name,
+            script=script,
+            script_params=script_params,
+            project_path=project_path,
+            is_create_environment=is_create_environment,
+            python_env_tool=python_env_tool,
+            requirements=requirements,
+        )
+        assert task.get_define() == expect
+
+
+@pytest.mark.parametrize(
+    "is_create_environment, project_path, python_command, expect",
+    [
+        (
+            DEFAULT.is_create_environment,
+            DEFAULT.project_path,
+            DEFAULT.python_command,
+            False,
+        ),
+        (True, DEFAULT.project_path, DEFAULT.python_command, True),
+        (DEFAULT.is_create_environment, "/home", DEFAULT.python_command, True),
+        (DEFAULT.is_create_environment, DEFAULT.project_path, "/usr/bin/python", True),
+    ],
+)
+def test_other_params(is_create_environment, project_path, python_command, expect):
+    """Test task pytorch function other_params."""
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
+    ):
+        task = Pytorch(
+            name="test",
+            script="",
+            script_params="",
+            project_path=project_path,
+            is_create_environment=is_create_environment,
+            python_command=python_command,
+        )
+        assert task.other_params == expect