You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/08/18 02:20:46 UTC

[dolphinscheduler] branch dev updated: [Feature] New task plugin Pytorch (#11498)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new a974ba74ab [Feature] New task plugin Pytorch (#11498)
a974ba74ab is described below

commit a974ba74ab73fd17e72195e93617b957cd0e20fd
Author: JieguangZhou <ji...@163.com>
AuthorDate: Thu Aug 18 10:20:37 2022 +0800

    [Feature] New task plugin Pytorch (#11498)
    
    Co-authored-by: Eric Gao <er...@gmail.com>
---
 docs/configs/docsdev.js                            |   8 +
 docs/docs/en/guide/task/pytorch.md                 | 118 ++++++++++
 docs/docs/zh/guide/task/pytorch.md                 | 112 +++++++++
 docs/img/tasks/demo/pytorch_en.png                 | Bin 0 -> 45966 bytes
 docs/img/tasks/demo/pytorch_note_en.png            | Bin 0 -> 103456 bytes
 docs/img/tasks/icons/pytorch.png                   | Bin 0 -> 7782 bytes
 .../dolphinscheduler-task-all/pom.xml              |   6 +
 .../dolphinscheduler-task-pytorch/pom.xml          |  49 ++++
 .../plugin/task/pytorch/GitProjectManager.java     |  82 +++++++
 .../plugin/task/pytorch/PythonEnvManager.java      |  77 +++++++
 .../plugin/task/pytorch/PytorchParameters.java     |  96 ++++++++
 .../plugin/task/pytorch/PytorchTask.java           | 132 +++++++++++
 .../plugin/task/pytorch/PytorchTaskChannel.java    |  49 ++++
 .../task/pytorch/PytorchTaskChannelFactory.java    |  45 ++++
 .../plugin/task/pytorch/PytorchTaskTest.java       | 252 +++++++++++++++++++++
 dolphinscheduler-task-plugin/pom.xml               |   1 +
 .../public/images/task-icons/pytorch.png           | Bin 0 -> 7782 bytes
 .../public/images/task-icons/pytorch_hover.png     | Bin 0 -> 92324 bytes
 dolphinscheduler-ui/src/locales/en_US/project.ts   |  13 +-
 dolphinscheduler-ui/src/locales/zh_CN/project.ts   |  13 +-
 .../projects/task/components/node/fields/index.ts  |   1 +
 .../task/components/node/fields/use-pytorch.ts     | 138 +++++++++++
 .../projects/task/components/node/format-data.ts   |  10 +
 .../projects/task/components/node/tasks/index.ts   |   4 +-
 .../task/components/node/tasks/use-pytorch.ts      |  88 +++++++
 .../views/projects/task/components/node/types.ts   |   8 +
 .../src/views/projects/task/constants/task-type.ts |   5 +
 .../workflow/components/dag/dag.module.scss        |   6 +
 28 files changed, 1310 insertions(+), 3 deletions(-)

diff --git a/docs/configs/docsdev.js b/docs/configs/docsdev.js
index fcb1eead67..b69a7f3354 100644
--- a/docs/configs/docsdev.js
+++ b/docs/configs/docsdev.js
@@ -189,6 +189,10 @@ export default {
                                 title: 'ChunJun',
                                 link: '/en-us/docs/dev/user_doc/guide/task/chunjun.html',
                             },
+                            {
+                                title: 'Pytorch',
+                                link: '/en-us/docs/dev/user_doc/guide/task/pytorch.html',
+                            },
                         ],
                     },
                     {
@@ -805,6 +809,10 @@ export default {
                                 title: 'ChunJun',
                                 link: '/zh-cn/docs/dev/user_doc/guide/task/chunjun.html',
                             },
+                            {
+                                title: 'Pytorch',
+                                link: '/zh-cn/docs/dev/user_doc/guide/task/pytorch.html',
+                            },
                         ],
                     },
                     {
diff --git a/docs/docs/en/guide/task/pytorch.md b/docs/docs/en/guide/task/pytorch.md
new file mode 100644
index 0000000000..b4bb78008b
--- /dev/null
+++ b/docs/docs/en/guide/task/pytorch.md
@@ -0,0 +1,118 @@
+# Pytorch Node (experimental)
+
+## Overview
+
+[Pytorch](https://pytorch.org) is a mainstream Python machine learning library.
+
+`Pytorch` task plugin enables users to run `Pytorch` projects in DolphinScheduler more conveniently. In addition, it supports handy Python environment management.
+
+`Pytorch task plugin` is more than `Python task plugin`, which allows users to **conveniently use existing Python environments or create new ones (using `Virtualenv` or `Conda`)**. **It supports running Python projects (native or Git projects)** instead of just Python scripts.
+
+## Create Task
+
+- Click `Project Management -> Project Name -> Workflow Definition`, and click the `Create Workflow` button to enter the DAG editing page.
+- Drag <img src="../../../../img/tasks/icons/pytorch.png" width="15"/> from the toolbar to the canvas.
+
+## Task Example
+
+The task plugin picture is as follows
+
+![pytorch](../../../../img/tasks/demo/pytorch_en.png)
+
+
+First, introduce some general parameters of DolphinScheduler:
+
+- **Node name**: The node name in a workflow definition is unique.
+- **Run flag**: Identifies whether this node schedules normally, if it does not need to execute, select
+  the `prohibition execution`.
+- **Descriptive information**: Describe the function of the node.
+- **Task priority**: When the number of worker threads is insufficient, execute in the order of priority from high
+  to low, and tasks with the same priority will execute in a first-in first-out order.
+- **Worker grouping**: Assign tasks to the machines of the worker group to execute. If `Default` is selected,
+  randomly select a worker machine for execution.
+- **Environment Name**: Configure the environment name in which run the script.
+- **Times of failed retry attempts**: The number of times the task failed to resubmit.
+- **Failed retry interval**: The time interval (unit minute) for resubmitting the task after a failed task.
+- **Delayed execution time**: The time (unit minute) that a task delays in execution.
+- **Timeout alarm**: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm
+  email will send and the task execution will fail.
+- **Resource**: Refers to the list of resource files that need to be called in the script, and the files uploaded or created in Resource Center - File Management.
+- **User-defined parameters**: It is a user-defined parameter of Shell, which will replace the content with `${variable}` in the script.
+- **Predecessor task**: Selecting a predecessor task for the current task, will set the selected predecessor task as
+  upstream of the current task.
+
+
+Here are some specific parameters for the Pytorch plugin:
+
+#### Run time parameters
+
+- **Python Script** :Entry to the Python script file that you want to run.
+- **Script Input Parameters** :Input parameters at run time.
+
+The preceding two parameters are used to minimize the running of the configuration. Additional configuration parameters are provided as follows. When you choose to expand more configurations, you can configure more parameters
+
+- **Project Path** :Set environment variable `PYTHONPATH` to load the Python package/project code at this path when running Python scripts。Support for local paths or Git URL.
+  - If it is a local path, it is used as environment variable `PYTHONPATH`.
+  - If the GIT URL (`git @ | https:// | http://` prefix), can download the project, and the path after downloading will be set as new `Project Path`, if need to run a folder under the project, you can add `#subdirectory` after GIT URL.
+
+#### Python environment parameters
+
+- **Create An Environment Or Not** :Whether to create a new Python environment to run the task.
+
+*no*
+
+- **Python Command Path** :Such as `/usr/bin/python`,The default value is ${PYTHON_HOME} in environment.
+
+*yes*
+
+- **Python Environment Manager Tool** :You can choose `virtualenv` or `conda`.
+  - if choose `virtualenv`,that may use `virtualenv` to create a new environment. Use command `virtualenv -p ${PYTHON_HOME} venv`.
+  - if choose `conda`, ,that may use `conda` to create a new environment,And you need to specify the Python version.
+- **Requirement File** :The defualt is requirements.txt。
+
+
+We can use relative paths of `Python Script` and `Requirement File` if we set `Project Path` which contains the python script or required requirement file.
+
+#### Demo
+
+Now if we want to run the mnist subproject under `https://github.com/pytorch/examples`.
+
+We can run task like below:
+
+![pytorch_note](../../../../img/tasks/demo/pytorch_note_en.png)
+
+
+In addition, if the code is stored in the `Resource`, you can use the `Resource` parameter to download the code, and write the related parameters into the path of the corresponding resource.
+
+## Environment configuration
+
+The environment configuration mainly depends on the choice of runtime Python environment. You need to configure the corresponding environment variables in the `Security` - `Environment Manage`.
+
+### Specifying a Python path
+
+It is applicable to the Python environment where the project has been run on the worker, so you can directly configure the `Python Command Path` as the corresponding Python environment in the component. If you do not know the environment address, you can use `which python` to obtain it.
+
+### Create a new environment using `Conda`
+
+It applies to a new environment to run the project. You need to create an environment in `Security` - `Environment Manage`. You can change the environment to the actual environment by referring to the following.
+
+```shell
+# Add the directory for the conda command
+export PATH=$HOME/anaconda3/bin:$PATH
+```
+
+### Create a new environment using `Virtualenv`
+
+It applies to a new environment to run the project. You need to create an environment in `Security` - `Environment Manage`. You can change the environment to the actual environment by referring to the following.
+
+```shell
+# Add the directory for the virtualenv command
+export PATH=/home/xxx/anaconda3/bin:$PATH
+export PYTHON_HOME=/usr/local/bin/python3.7
+```
+
+## Other
+
+This task plugin can also run XGBoost, LightGBM, SkLearn, TensorFlow, Keras and other projects. This task plugin is available as an upgrade to Python task plugin running machine learning tasks.
+
+If necessary, we can call this task plugin `PythonML`,which can run machine learning projects easily in DolphinScheduler.
diff --git a/docs/docs/zh/guide/task/pytorch.md b/docs/docs/zh/guide/task/pytorch.md
new file mode 100644
index 0000000000..4dcc6362e4
--- /dev/null
+++ b/docs/docs/zh/guide/task/pytorch.md
@@ -0,0 +1,112 @@
+# Pytorch 节点(试验版)
+
+## 综述
+
+[Pytorch](https://pytorch.org) 是一个的主流Python机器学习库。
+
+为了用户能够在DolphinScheduler中**更方便的运行Pytorch项目**,实现了Pytorch任务组件。主要提供**便捷的python环境管理**以及支持**运行python项目**。
+
+与Python任务组件不同,该组件允许用户快速使用已有python环境或者创建新的python环境(使用virtualenv或者conda);支持运行Python项目(本地项目或者Git项目)而非只是python脚本。
+
+## 创建任务
+
+- 点击项目管理-项目名称-工作流定义,点击“创建工作流”按钮,进入 DAG 编辑页面;
+- 拖动工具栏的 <img src="../../../../img/tasks/icons/pytorch.png" width="15"/> 任务节点到画板中。
+
+
+## 任务样例
+
+组件图示如下:
+
+![pytorch](../../../../img/tasks/demo/pytorch_en.png)
+
+### 首先介绍一些DS通用参数
+
+- **节点名称** :设置任务的名称。一个工作流定义中的节点名称是唯一的。
+- **运行标志** :标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。
+- **描述** :描述该节点的功能。
+- **任务优先级** :worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。
+- **Worker 分组** :任务分配给 worker 组的机器执行,选择 Default,会随机选择一台 worker 机执行。
+- **环境名称** :配置运行脚本的环境。
+- **失败重试次数** :任务失败重新提交的次数。
+- **失败重试间隔** :任务失败重新提交任务的时间间隔,以分钟为单位。
+- **延迟执行时间** :任务延迟执行的时间,以分钟为单位。
+- **资源**:是指脚本中需要调用的资源文件列表,资源中心-文件管理上传或创建的文件。
+- **自定义参数**:是 SHELL 局部的用户自定义参数,会替换脚本中以 `${变量}` 的内容。
+- **前置任务** :选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
+
+
+### Pytorch组件独有的参数
+
+#### 运行参数
+
+- **python脚本** :需要运行的python脚本文件入口。
+- **脚本启动参数** :运行时的输入参数。
+
+以上为两个最小化配置运行的参数,另外提供其他的一些配置参数如下可选,当选择展开更多配置时,可以配置更多参数。
+
+- **python项目地址** :设置`PYTHONPATH`环境变量,设置后运行python脚本时可以加载该地址下的python包/项目代码。支持本地路径或者Git url
+  - 若为本地路径,作为`PYTHONPATH`环境变量。
+  - 如果为Git URL (以`git@ | https:// | http:// `前缀),则会下载项目,并将下载后存放地址作为新的**python项目地址**,若需要运行子文件夹下的项目,可以添加 `#subdirectory` 来配置。
+
+#### python环境参数
+
+- **是否创建新环境** :是否创建新的python环境来运行该任务。
+
+*否*
+
+- **python命令路径** :如`/usr/bin/python`,默认为DS环境配置中的`${PYTHON_HOME}`。
+
+*是*
+
+- **python环境管理工具** :可以选择virtualenv或者conda。
+  - 若选择`virtualenv`,则会用`virtualenv`创建一个新环境,使用命令 `virtualenv -p ${PYTHON_HOME} venv` 创建
+  - 若选择`conda`, 则会使用`conda` 创建一个新环境,并需要指定创建的python版本。
+- **依赖文件** :默认为 requirements.txt。
+
+
+配置了`python项目地址`参数,那么`python脚本`和`依赖文件`参数允许输入相对路径
+
+#### Demo
+
+如现在需要运行 https://github.com/pytorch/examples 项目下的mnist的子项目。
+
+可以设置
+
+![pytorch_note](../../../../img/tasks/demo/pytorch_note_en.png)
+
+
+另外如果代码存放在资源中心,则可以使用`资源`参数下载代码,并将相关参数写成对应资源的路径即可。
+
+## 环境配置
+
+环境配置主要取决于运行时python环境的选择,需要在`安全中心`-`环境管理`中配置对应需要的环境变量即可。
+
+### 指定python路径
+
+适用于worker上已经有运行该项目的python环境,那么可以直接在组件中配置`pyhton命令路径`为对应的python环境即可,如果不知道该环境地址,可以使用`which python`获取。
+
+### 使用Conda创建新环境
+
+适用于新建环境运行该项目,需要在`安全中心`-`环境管理`中创建环境, 参考如下添加修改为实际环境即可。
+```shell
+# conda命令对应的目录加入PATH中
+export PATH=$HOME/anaconda3/bin:$PATH
+```
+
+### 使用virtualenv创建新环境
+
+适用于新建环境运行该项目,需要在`安全中心`-`环境管理`中创建环境, 参考如下添加修改为实际环境即可。
+
+```shell
+# virtualenv命令对应的目录加入PATH中
+export PATH=/home/lucky/anaconda3/bin:$PATH
+export PYTHON_HOME=/usr/local/bin/python3.7
+```
+
+
+## 其他
+
+本组件也可以运行xgboost, lightgbm, sklearn, tensorflow, keras 等项目。本组件可作为python组件运行机器学习任务的升级组件。
+
+如果有需要,后续建议可以统一涵盖为PythonML组件,来运行机器学习项目。
\ No newline at end of file
diff --git a/docs/img/tasks/demo/pytorch_en.png b/docs/img/tasks/demo/pytorch_en.png
new file mode 100644
index 0000000000..46be1ec223
Binary files /dev/null and b/docs/img/tasks/demo/pytorch_en.png differ
diff --git a/docs/img/tasks/demo/pytorch_note_en.png b/docs/img/tasks/demo/pytorch_note_en.png
new file mode 100644
index 0000000000..088beca3b1
Binary files /dev/null and b/docs/img/tasks/demo/pytorch_note_en.png differ
diff --git a/docs/img/tasks/icons/pytorch.png b/docs/img/tasks/icons/pytorch.png
new file mode 100644
index 0000000000..e455a134f1
Binary files /dev/null and b/docs/img/tasks/icons/pytorch.png differ
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
index 84ad0169ac..fd1111d658 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
@@ -195,6 +195,12 @@
             <artifactId>dolphinscheduler-task-sagemaker</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-task-pytorch</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/pom.xml
new file mode 100644
index 0000000000..0a0107d0e3
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>dolphinscheduler-task-plugin</artifactId>
+        <groupId>org.apache.dolphinscheduler</groupId>
+        <version>dev-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>dolphinscheduler-task-pytorch</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-spi</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-task-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+    </dependencies>
+</project>
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/GitProjectManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/GitProjectManager.java
new file mode 100644
index 0000000000..d65e5e1150
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/GitProjectManager.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.pytorch;
+
+import static org.apache.dolphinscheduler.plugin.task.api.AbstractShell.ExitCodeException;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.Data;
+
+@Data
+public class GitProjectManager {
+    public static final String GIT_PATH_LOCAL = "GIT_PROJECT";
+    private static final Pattern GIT_CHECK_PATTERN = Pattern.compile("^(git@|https?://)");
+    protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
+    private String path;
+    private String baseDir = ".";
+
+    public static boolean isGitPath(String path) {
+        return GIT_CHECK_PATTERN.matcher(path).find();
+    }
+
+    public void prepareProject() throws Exception {
+        String savePath = Paths.get(baseDir, GIT_PATH_LOCAL).toString();
+
+        logger.info("clone project {} to {}", path, savePath);
+        String[] command = {"sh", "-c", String.format("git clone %s %s", getGitUrl(), savePath)};
+        try {
+            OSUtils.exeShell(command);
+        } catch (ExitCodeException e) {
+            if (!new File(savePath).exists()) {
+                throw e;
+            }
+        }
+        logger.info("clone project done");
+    }
+
+    public String getGitUrl() {
+        String gitUrl;
+        if (path.contains("#")) {
+            gitUrl = path.split("#")[0];
+        } else {
+            gitUrl = path;
+        }
+        return gitUrl;
+
+    }
+
+    public String getGitLocalPath() {
+        String gitLocalPath;
+        if (path.contains("#")) {
+            gitLocalPath = Paths.get(GIT_PATH_LOCAL, path.split("#")[1]).toString();
+        } else {
+            gitLocalPath = GIT_PATH_LOCAL;
+        }
+        return Paths.get(baseDir, gitLocalPath).toString();
+
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PythonEnvManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PythonEnvManager.java
new file mode 100644
index 0000000000..d13e234c33
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PythonEnvManager.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.pytorch;
+
+import lombok.Data;
+
+@Data
+public class PythonEnvManager {
+
+    public static final String ENV_TOOL_VENV = "virtualenv";
+
+    public static final String ENV_TOOL_CONDA = "conda";
+
+    private static final String PATTERN_ENVIRONMENT_PYTHON = "python[\\d\\.]*$";
+
+    private static final String PATTERN_ENVIRONMENT_REQUIREMENT = "\\.txt$";
+
+    private static final String CREATE_ENV_NAME = "./venv";
+
+    private static final String CONDA_SOURCE = "source activate %s";
+
+    private static final String CONDA_BUILD = "conda create -y python=%s -p %s";
+
+    private static final String VIRTUALENV_SOURCE = "source %s/bin/activate";
+
+    private static final String VIRTUALENV_BUILD = "virtualenv -p ${PYTHON_HOME} %s";
+
+    private static final String INSTALL_COMMAND = "python -m pip install -r %s";
+
+    private String pythonEnvTool = ENV_TOOL_VENV;
+
+    private String condaPythonVersion = "3.9";
+
+    public String getBuildEnvCommand(String requirementPath) {
+        String buildCommand = "";
+        String sourceCommand = getSourceEnvCommand(CREATE_ENV_NAME);
+        if (pythonEnvTool.equals(ENV_TOOL_VENV)) {
+            buildCommand = String.format(VIRTUALENV_BUILD, CREATE_ENV_NAME);
+        } else if (pythonEnvTool.equals(ENV_TOOL_CONDA)) {
+            buildCommand = String.format(CONDA_BUILD, condaPythonVersion, CREATE_ENV_NAME);
+        }
+        String installCommand = String.format(INSTALL_COMMAND, requirementPath);
+
+        return buildCommand + " && " + sourceCommand + " && " + installCommand;
+    }
+
+    private String getSourceEnvCommand(String envName) {
+        String command = "";
+        if (pythonEnvTool.equals(ENV_TOOL_VENV)) {
+            command = String.format(VIRTUALENV_SOURCE, envName);
+        } else if (pythonEnvTool.equals(ENV_TOOL_CONDA)) {
+            command = String.format(CONDA_SOURCE, envName);
+        }
+
+        return command;
+    }
+
+    public String getPythonCommand() {
+        return String.format("%s/bin/python", CREATE_ENV_NAME);
+    }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchParameters.java
new file mode 100644
index 0000000000..cbac9b548e
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchParameters.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.pytorch;
+
+import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.List;
+
+import lombok.Data;
+
+@Data
+public class PytorchParameters extends AbstractParameters {
+
+    private Boolean isCreateEnvironment = false;
+    private String pythonPath = ".";
+    private String script;
+    private String scriptParams;
+    private String pythonCommand = "${PYTHON_HOME}";
+    private String pythonEnvTool = PythonEnvManager.ENV_TOOL_VENV;
+    private String requirements = "requirements.txt";
+    private String condaPythonVersion = "3.9";
+    /**
+     * resource list
+     */
+    private List<ResourceInfo> resourceList;
+
+    @Override
+    public List<ResourceInfo> getResourceFilesList() {
+        return resourceList;
+    }
+
+    @Override
+    public boolean checkParameters() {
+        return true;
+    }
+
+
+    public String getRequirementPath() {
+        return getPossiblePath(requirements);
+    }
+
+    public String getPythonCommand() {
+        String command;
+        if (pythonCommand.isEmpty()) {
+            command = "${PYTHON_HOME}";
+        } else {
+            command = pythonCommand;
+        }
+        return command;
+    }
+
+
+    public String getScriptPath() {
+        return getPossiblePath(script);
+    }
+
+    private String getPossiblePath(String filePath) {
+        String possiblePath = filePath;
+        File sourceFile = new File(possiblePath);
+        String newPath = Paths.get(pythonPath, possiblePath).toString();
+        File newFile = new File(newPath);
+        if (newFile.exists() && !sourceFile.exists()) {
+            possiblePath = newPath;
+        } else if (resourceList != null) {
+            String newPathResource = StringUtils.removeStart(newPath, "./");
+            for (ResourceInfo resourceInfo : resourceList) {
+                if (resourceInfo.getResourceName().equals("/" + newPathResource)) {
+                    possiblePath = newPath;
+                    break;
+                }
+            }
+        }
+        return possiblePath;
+
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
new file mode 100644
index 0000000000..b738194d4e
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.pytorch;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class PytorchTask extends AbstractTaskExecutor {
+
+    private final ShellCommandExecutor shellCommandExecutor;
+    protected PytorchParameters pytorchParameters;
+    protected TaskExecutionContext taskExecutionContext;
+    private PythonEnvManager pythonEnvManager;
+
+    public PytorchTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+        this.taskExecutionContext = taskExecutionContext;
+
+        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
+            taskExecutionContext,
+            logger);
+    }
+
+    @Override
+    public void init() {
+        logger.info("python task params {}", taskExecutionContext.getTaskParams());
+
+        pytorchParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), PytorchParameters.class);
+
+        if (!pytorchParameters.checkParameters()) {
+            throw new TaskException("python task params is not valid");
+        }
+
+        pythonEnvManager = new PythonEnvManager();
+        pythonEnvManager.setPythonEnvTool(pytorchParameters.getPythonEnvTool());
+        pythonEnvManager.setCondaPythonVersion(pytorchParameters.getCondaPythonVersion());
+    }
+
+    @Override
+    public void handle() throws Exception {
+        try {
+            String command = buildPythonExecuteCommand();
+            TaskResponse taskResponse = shellCommandExecutor.run(command);
+            setExitStatusCode(taskResponse.getExitStatusCode());
+            setAppIds(taskResponse.getAppIds());
+            setProcessId(taskResponse.getProcessId());
+            setVarPool(shellCommandExecutor.getVarPool());
+        } catch (Exception e) {
+            setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+            throw e;
+        }
+    }
+
+
+    public String buildPythonExecuteCommand() throws Exception {
+        List<String> args = new ArrayList<>();
+
+        String pythonPath = pytorchParameters.getPythonPath();
+
+        if (GitProjectManager.isGitPath(pythonPath)) {
+            GitProjectManager gpm = new GitProjectManager();
+            gpm.setPath(pythonPath);
+            gpm.setBaseDir(taskExecutionContext.getExecutePath());
+            gpm.prepareProject();
+            pytorchParameters.setPythonPath(gpm.getGitLocalPath());
+        }
+
+        args.add(String.format("export PYTHONPATH=%s", pytorchParameters.getPythonPath()));
+
+        if (pytorchParameters.getIsCreateEnvironment()) {
+            String buildEnvCommand = pythonEnvManager.getBuildEnvCommand(pytorchParameters.getRequirementPath());
+            args.add(buildEnvCommand);
+        }
+
+        String scriptParams = pytorchParameters.getScriptParams();
+        if (scriptParams != null && !scriptParams.isEmpty()) {
+            args.add(String.format("%s %s %s", getPythonCommand(), pytorchParameters.getScriptPath(), pytorchParameters.getScriptParams()));
+        } else {
+            args.add(String.format("%s %s", getPythonCommand(), pytorchParameters.getScriptPath()));
+
+        }
+
+        Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
+        return ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParamUtils.convert(paramsMap));
+    }
+
+    private String getPythonCommand() {
+        String pythonCommand;
+        if (pytorchParameters.getIsCreateEnvironment()) {
+            pythonCommand = pythonEnvManager.getPythonCommand();
+        } else {
+            pythonCommand = pytorchParameters.getPythonCommand();
+        }
+        return pythonCommand;
+
+    }
+
+
+    @Override
+    public AbstractParameters getParameters() {
+        return pytorchParameters;
+    }
+}
+
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskChannel.java
new file mode 100644
index 0000000000..8e06173f4e
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskChannel.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.pytorch;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+public class PytorchTaskChannel implements TaskChannel {
+
+    @Override
+    public void cancelApplication(boolean status) {
+
+    }
+
+    @Override
+    public PytorchTask createTask(TaskExecutionContext taskRequest) {
+        return new PytorchTask(taskRequest);
+    }
+
+    @Override
+    public AbstractParameters parseParameters(ParametersNode parametersNode) {
+        return JSONUtils.parseObject(parametersNode.getTaskParams(), PytorchParameters.class);
+    }
+
+    @Override
+    public ResourceParametersHelper getResources(String parameters) {
+        return null;
+    }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskChannelFactory.java
new file mode 100644
index 0000000000..be7e3d6545
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskChannelFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.pytorch;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
+import org.apache.dolphinscheduler.spi.params.base.PluginParams;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(TaskChannelFactory.class)
+public class PytorchTaskChannelFactory implements TaskChannelFactory {
+    @Override
+    public TaskChannel create() {
+        return new PytorchTaskChannel();
+    }
+
+    @Override
+    public String getName() {
+        return "PYTORCH";
+    }
+
+    @Override
+    public List<PluginParams> getParams() {
+        return Collections.emptyList();
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/test/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/test/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskTest.java
new file mode 100644
index 0000000000..835d31fe72
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/test/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.pytorch;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.lang3.SystemUtils;
+
+import java.io.File;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.Date;
+import java.util.Set;
+import java.util.UUID;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({JSONUtils.class, PropertyUtils.class,})
+@PowerMockIgnore({"javax.*"})
+@SuppressStaticInitializationFor("org.apache.dolphinscheduler.spi.utils.PropertyUtils")
+public class PytorchTaskTest {
+
+    private final String pythonPath = ".";
+    private final String requirementPath = "requirements.txt";
+
+    @Before
+    public void before() {
+        PowerMockito.mockStatic(PropertyUtils.class);
+    }
+
+    @Test
+    public void testPythonEnvManager() {
+        PythonEnvManager envManager = new PythonEnvManager();
+
+        envManager.setPythonEnvTool(PythonEnvManager.ENV_TOOL_CONDA);
+        envManager.setCondaPythonVersion("3.9");
+        String condaEnvCommand39 = envManager.getBuildEnvCommand(requirementPath);
+        Assert.assertEquals(condaEnvCommand39, "conda create -y python=3.9 -p ./venv && source activate ./venv && python -m pip install -r " + requirementPath);
+
+        envManager.setCondaPythonVersion("3.8");
+        String condaEnvCommand38 = envManager.getBuildEnvCommand(requirementPath);
+        Assert.assertEquals(condaEnvCommand38, "conda create -y python=3.8 -p ./venv && source activate ./venv && python -m pip install -r " + requirementPath);
+
+
+        envManager.setPythonEnvTool(PythonEnvManager.ENV_TOOL_VENV);
+        String venvEnvCommand = envManager.getBuildEnvCommand(requirementPath);
+        Assert.assertEquals(venvEnvCommand, "virtualenv -p ${PYTHON_HOME} ./venv && source ./venv/bin/activate && python -m pip install -r " + requirementPath);
+
+    }
+
+    @Test
+    public void testGitProject() {
+
+        assertFalse(GitProjectManager.isGitPath("dolphinscheduler/test"));
+        assertFalse(GitProjectManager.isGitPath("/dolphinscheduler/test"));
+        assertTrue(GitProjectManager.isGitPath("https://github.com/apache/dolphinscheduler.git"));
+        assertTrue(GitProjectManager.isGitPath("git@github.com:apache/dolphinscheduler.git"));
+        assertTrue(GitProjectManager.isGitPath("git@github.com:apache/dolphinscheduler.git#doc"));
+
+        GitProjectManager gpm1 = new GitProjectManager();
+        gpm1.setPath("git@github.com:apache/dolphinscheduler.git#doc");
+        Assert.assertEquals("git@github.com:apache/dolphinscheduler.git", gpm1.getGitUrl());
+        Assert.assertEquals("./GIT_PROJECT/doc", gpm1.getGitLocalPath());
+
+        GitProjectManager gpm2 = new GitProjectManager();
+        gpm2.setPath("git@github.com:apache/dolphinscheduler.git");
+        Assert.assertEquals("git@github.com:apache/dolphinscheduler.git", gpm2.getGitUrl());
+        Assert.assertEquals("./GIT_PROJECT", gpm2.getGitLocalPath());
+
+    }
+
+    @Test
+    public void testBuildPythonCommandWithoutCreateEnvironment() throws Exception {
+        PytorchParameters parameters = new PytorchParameters();
+        parameters.setScript("main.py");
+        parameters.setScriptParams("--epochs=1 --dry-run");
+
+        PytorchTask task1 = initTask(parameters);
+        Assert.assertEquals(task1.buildPythonExecuteCommand(),
+            "export PYTHONPATH=.\n" +
+                "${PYTHON_HOME} main.py --epochs=1 --dry-run");
+
+        parameters.setPythonCommand("");
+        PytorchTask task2 = initTask(parameters);
+        Assert.assertEquals(task2.buildPythonExecuteCommand(),
+            "export PYTHONPATH=.\n" +
+                "${PYTHON_HOME} main.py --epochs=1 --dry-run");
+
+        parameters.setPythonCommand("/usr/bin/python");
+        PytorchTask task3 = initTask(parameters);
+        Assert.assertEquals(task3.buildPythonExecuteCommand(),
+            "export PYTHONPATH=.\n" +
+                "/usr/bin/python main.py --epochs=1 --dry-run");
+
+    }
+
+
+    @Test
+    public void testBuildPythonCommandWithCreateCondeEnv() throws Exception {
+        PytorchParameters parameters = new PytorchParameters();
+        parameters.setPythonPath(pythonPath);
+        parameters.setIsCreateEnvironment(true);
+        parameters.setCondaPythonVersion("3.6");
+        parameters.setPythonEnvTool(PythonEnvManager.ENV_TOOL_CONDA);
+        parameters.setRequirements("requirements.txt");
+        parameters.setScript("main.py");
+        parameters.setScriptParams("--epochs=1 --dry-run");
+
+        PytorchTask task = initTask(parameters);
+        Assert.assertEquals(task.buildPythonExecuteCommand(),
+            "export PYTHONPATH=.\n" +
+                "conda create -y python=3.6 -p ./venv && source activate ./venv && python -m pip install -r requirements.txt\n" +
+                "./venv/bin/python main.py --epochs=1 --dry-run");
+    }
+
+    @Test
+    public void testBuildPythonCommandWithCreateVenvEnv() throws Exception {
+        PytorchParameters parameters = new PytorchParameters();
+        parameters.setPythonPath(pythonPath);
+        parameters.setIsCreateEnvironment(true);
+        parameters.setPythonEnvTool(PythonEnvManager.ENV_TOOL_VENV);
+        parameters.setRequirements("requirements.txt");
+        parameters.setScript("main.py");
+        parameters.setScriptParams("--epochs=1 --dry-run");
+
+        PytorchTask task = initTask(parameters);
+        Assert.assertEquals(task.buildPythonExecuteCommand(),
+            "export PYTHONPATH=.\n" +
+                "virtualenv -p ${PYTHON_HOME} ./venv && source ./venv/bin/activate && python -m pip install -r requirements.txt\n" +
+                "./venv/bin/python main.py --epochs=1 --dry-run");
+
+    }
+
+    @Test
+    public void testGetPossiblePath() throws Exception {
+        String requirements = "requirements.txt";
+        String script = "train.py";
+        String pythonPath = Paths.get("/tmp", UUID.randomUUID().toString()).toString();
+
+        PytorchParameters parameters = new PytorchParameters();
+        parameters.setRequirements(requirements);
+        parameters.setScript(script);
+        parameters.setPythonPath(pythonPath);
+        parameters.setIsCreateEnvironment(true);
+        parameters.setPythonEnvTool(PythonEnvManager.ENV_TOOL_VENV);
+
+        PytorchTask task = initTask(parameters);
+
+        String requirementFile = Paths.get(pythonPath, requirements).toString();
+        String scriptFile = Paths.get(pythonPath, script).toString();
+        createFile(requirementFile);
+        createFile(scriptFile);
+
+        String expected = "export PYTHONPATH=%s\n" +
+            "virtualenv -p ${PYTHON_HOME} ./venv && source ./venv/bin/activate && python -m pip install -r %s\n" +
+            "./venv/bin/python %s";
+        System.out.println(task.buildPythonExecuteCommand());
+        Assert.assertEquals(String.format(expected, pythonPath, requirementFile, scriptFile), task.buildPythonExecuteCommand());
+
+    }
+
+
+    private PytorchTask initTask(PytorchParameters pytorchParameters) {
+        TaskExecutionContext taskExecutionContext = createContext(pytorchParameters);
+        PytorchTask task = new PytorchTask(taskExecutionContext);
+        task.init();
+        return task;
+    }
+
+    public TaskExecutionContext createContext(PytorchParameters pytorchParameters) {
+        String parameters = JSONUtils.toJsonString(pytorchParameters);
+        TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
+        Mockito.when(taskExecutionContext.getTaskLogName()).thenReturn("PytorchTest");
+        String APP_ID = UUID.randomUUID().toString();
+        String folder = String.format("/tmp/dolphinscheduler_PytorchTest_%s", APP_ID);
+        Mockito.when(taskExecutionContext.getExecutePath()).thenReturn(folder);
+        Mockito.when(taskExecutionContext.getTaskAppId()).thenReturn(APP_ID);
+        Mockito.when(taskExecutionContext.getTenantCode()).thenReturn("root");
+        Mockito.when(taskExecutionContext.getStartTime()).thenReturn(new Date());
+        Mockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000);
+        Mockito.when(taskExecutionContext.getLogPath()).thenReturn(folder + "/log");
+        Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
+        String envirementConfig = "export PATH=$HOME/anaconda3/bin:$PATH\n" + "export PYTHON_HOME=/bin/python";
+        Mockito.when(taskExecutionContext.getEnvironmentConfig()).thenReturn(envirementConfig);
+
+        String userName = System.getenv().get("USER");
+        Mockito.when(taskExecutionContext.getTenantCode()).thenReturn(userName);
+
+        TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
+        return taskExecutionContext;
+    }
+
+    private void createFile(String fileName) throws Exception {
+        File file = new File(fileName);
+        Path path = file.toPath();
+        Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
+        FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
+        if (SystemUtils.IS_OS_WINDOWS) {
+            Files.createFile(path);
+        } else {
+            if (!file.getParentFile().exists()) {
+                file.getParentFile().mkdirs();
+            }
+            try {
+                Files.createFile(path, attr);
+            } catch (FileAlreadyExistsException ex) {
+                // this is expected
+            }
+        }
+
+    }
+
+}
+
+
diff --git a/dolphinscheduler-task-plugin/pom.xml b/dolphinscheduler-task-plugin/pom.xml
index a432fc3fd1..35444c62ad 100644
--- a/dolphinscheduler-task-plugin/pom.xml
+++ b/dolphinscheduler-task-plugin/pom.xml
@@ -60,6 +60,7 @@
         <module>dolphinscheduler-task-sagemaker</module>
         <module>dolphinscheduler-task-chunjun</module>
         <module>dolphinscheduler-task-flink-stream</module>
+        <module>dolphinscheduler-task-pytorch</module>
     </modules>
 
     <dependencyManagement>
diff --git a/dolphinscheduler-ui/public/images/task-icons/pytorch.png b/dolphinscheduler-ui/public/images/task-icons/pytorch.png
new file mode 100644
index 0000000000..e455a134f1
Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/pytorch.png differ
diff --git a/dolphinscheduler-ui/public/images/task-icons/pytorch_hover.png b/dolphinscheduler-ui/public/images/task-icons/pytorch_hover.png
new file mode 100644
index 0000000000..c1832af6f4
Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/pytorch_hover.png differ
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts
index a88bc44e12..f8751df123 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -753,6 +753,17 @@ export default {
     dinky_address_tips: 'Please enter the url of your dinky',
     dinky_task_id: 'Dinky task id',
     dinky_task_id_tips: 'Please enter the task id of your dinky',
-    dinky_online: 'Online task'
+    dinky_online: 'Online task',
+    pytorch_script: 'Python Script',
+    pytorch_script_params: 'Script Input Parameters',
+    pytorch_other_params: 'Show More Configurations',
+    pytorch_python_path: 'Project Path',
+    pytorch_is_create_environment: 'Create An Environment Or Not',
+    pytorch_python_command: 'Python Command Path',
+    pytorch_python_command_tips: 'If empty,will be set $PYTHON_HOME',
+    pytorch_python_env_tool: 'Python Environment Manager Tool',
+    pytorch_requirements: 'Requirement File',
+    pytorch_conda_python_version: 'Python Version',
+    pytorch_conda_python_version_tips: 'Please enter the version number, such as 3.6, 3.7, 3.x'
   }
 }
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index 48a01fc5a0..6f423786ba 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -738,6 +738,17 @@ export default {
     dinky_address_tips: '请输入 Dinky 地址',
     dinky_task_id: 'dinky 作业ID',
     dinky_task_id_tips: '请输入作业 ID',
-    dinky_online: '是否上线作业'
+    dinky_online: '是否上线作业',
+    pytorch_script: 'python脚本',
+    pytorch_script_params: '脚本启动参数',
+    pytorch_other_params: '展开更多配置',
+    pytorch_python_path: 'python项目地址',
+    pytorch_is_create_environment: '是否创建新环境',
+    pytorch_python_command: 'python命令路径',
+    pytorch_python_command_tips: '若为空,则使用$PYTHON_HOME',
+    pytorch_python_env_tool: 'python环境管理工具',
+    pytorch_requirements: '依赖文件',
+    pytorch_conda_python_version: 'python版本',
+    pytorch_conda_python_version_tips: '请输入版本号,如 3.6, 3.7, 3.x等'
   }
 }
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
index d1e2e2b3a2..74c721bbe1 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
@@ -76,3 +76,4 @@ export { useDinky } from './use-dinky'
 export { useSagemaker } from './use-sagemaker'
 export { useChunjun } from './use-chunjun'
 export { useChunjunDeployMode } from './use-chunjun-deploy-mode'
+export { usePytorch } from './use-pytorch'
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-pytorch.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-pytorch.ts
new file mode 100644
index 0000000000..a74e56f992
--- /dev/null
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-pytorch.ts
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { useI18n } from 'vue-i18n'
+import type { IJsonItem } from '../types'
+import { watch, ref } from 'vue'
+import { useCustomParams, useResources } from '.'
+
+export function usePytorch(model: { [field: string]: any }): IJsonItem[] {
+  const { t } = useI18n()
+
+  const isCreateEnvironmentSpan = ref(0)
+  const pythonPathSpan = ref(0)
+  const pythonEnvToolSpan = ref(0)
+  const pythonCommandSpan = ref(0)
+  const requirementsSpan = ref(0)
+  const condaPythonVersionSpan = ref(0)
+
+  const setFlag = () => {
+    model.showCreateEnvironment = model.isCreateEnvironment && model.otherParams
+    model.showCreateConda =
+      model.showCreateEnvironment && model.pythonEnvTool === 'conda'
+        ? true
+        : false
+    model.showCreateEnv =
+      model.showCreateEnvironment && model.pythonEnvTool === 'virtualenv'
+        ? true
+        : false
+  }
+
+  const resetSpan = () => {
+    isCreateEnvironmentSpan.value = model.otherParams ? 12 : 0
+    pythonPathSpan.value = model.otherParams ? 24 : 0
+    pythonEnvToolSpan.value = model.showCreateEnvironment ? 12 : 0
+    pythonCommandSpan.value = ~model.showCreateEnvironment & model.otherParams ? 12 : 0
+    requirementsSpan.value = model.showCreateEnvironment ? 24 : 0
+    condaPythonVersionSpan.value = model.showCreateConda ? 24 : 0
+  }
+
+  watch(
+    () => [model.isCreateEnvironment, model.pythonEnvTool, model.otherParams],
+    () => {
+      setFlag()
+      resetSpan()
+    }
+  )
+
+  return [
+    {
+      type: 'input',
+      field: 'script',
+      name: t('project.node.pytorch_script'),
+      span: 24
+    },
+    {
+      type: 'input',
+      field: 'scriptParams',
+      name: t('project.node.pytorch_script_params'),
+      span: 24
+    },
+    {
+      type: 'switch',
+      field: 'otherParams',
+      name: t('project.node.pytorch_other_params'),
+      span: 24
+    },
+    {
+      type: 'input',
+      field: 'pythonPath',
+      name: t('project.node.pytorch_python_path'),
+      span: pythonPathSpan
+    },
+    {
+      type: 'switch',
+      field: 'isCreateEnvironment',
+      name: t('project.node.pytorch_is_create_environment'),
+      span: isCreateEnvironmentSpan
+    },
+    {
+      type: 'input',
+      field: 'pythonCommand',
+      name: t('project.node.pytorch_python_command'),
+      span: pythonCommandSpan,
+      props: {
+        placeholder: t('project.node.pytorch_python_command_tips')
+      }
+    },
+    {
+      type: 'select',
+      field: 'pythonEnvTool',
+      name: t('project.node.pytorch_python_env_tool'),
+      span: pythonEnvToolSpan,
+      options: PYTHON_ENV_TOOL
+    },
+    {
+      type: 'input',
+      field: 'requirements',
+      name: t('project.node.pytorch_requirements'),
+      span: requirementsSpan
+    },
+    {
+      type: 'input',
+      field: 'condaPythonVersion',
+      name: t('project.node.pytorch_conda_python_version'),
+      span: condaPythonVersionSpan,
+      props: {
+        placeholder: t('project.node.pytorch_conda_python_version_tips')
+      }
+    },
+    useResources(),
+    ...useCustomParams({ model, field: 'localParams', isSimple: false })
+  ]
+}
+
+export const PYTHON_ENV_TOOL = [
+  {
+    label: 'conda',
+    value: 'conda'
+  },
+  {
+    label: 'virtualenv',
+    value: 'virtualenv'
+  }
+]
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index be2e1d0e17..becaafea85 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -386,6 +386,16 @@ export function formatParams(data: INodeData): {
   if (data.taskType === 'SAGEMAKER') {
     taskParams.sagemakerRequestJson = data.sagemakerRequestJson
   }
+  if (data.taskType === 'PYTORCH') {
+    taskParams.script = data.script
+    taskParams.scriptParams = data.scriptParams
+    taskParams.pythonPath = data.pythonPath
+    taskParams.isCreateEnvironment = data.isCreateEnvironment
+    taskParams.pythonCommand = data.pythonCommand
+    taskParams.pythonEnvTool = data.pythonEnvTool
+    taskParams.requirements = data.requirements
+    taskParams.condaPythonVersion = data.condaPythonVersion
+  }
 
   if (data.taskType === 'DINKY') {
     taskParams.address = data.address
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
index f2b0d148bb..0ac47ea9b4 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
@@ -43,6 +43,7 @@ import { useDvc } from './use-dvc'
 import { useDinky } from './use-dinky'
 import { userSagemaker } from './use-sagemaker'
 import { useChunjun } from './use-chunjun'
+import { usePytorch } from './use-pytorch'
 
 export default {
   SHELL: useShell,
@@ -72,5 +73,6 @@ export default {
   DINKY: useDinky,
   SAGEMAKER: userSagemaker,
   CHUNJUN: useChunjun,
-  FLINK_STREAM: useFlinkStream
+  FLINK_STREAM: useFlinkStream,
+  PYTORCH: usePytorch
 }
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-pytorch.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-pytorch.ts
new file mode 100644
index 0000000000..7b24a07adf
--- /dev/null
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-pytorch.ts
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive } from 'vue'
+import * as Fields from '../fields/index'
+import type { IJsonItem, INodeData, ITaskData } from '../types'
+
+export function usePytorch({
+  projectCode,
+  from = 0,
+  readonly,
+  data
+}: {
+  projectCode: number
+  from?: number
+  readonly?: boolean
+  data?: ITaskData
+}) {
+  const model = reactive({
+    name: '',
+    taskType: 'PYTORCH',
+    flag: 'YES',
+    description: '',
+    timeoutFlag: false,
+    localParams: [],
+    environmentCode: null,
+    failRetryInterval: 1,
+    failRetryTimes: 0,
+    workerGroup: 'default',
+    delayTime: 0,
+    timeout: 30,
+    timeoutNotifyStrategy: ['WARN'],
+    pythonEnvTool: 'conda',
+    pythonCommand: '${PYTHON_HOME}',
+    condaPythonVersion: '3.7',
+    requirements: 'requirements.txt',
+    pythonPath: '.'
+  } as INodeData)
+
+  let extra: IJsonItem[] = []
+  if (from === 1) {
+    extra = [
+      Fields.useTaskType(model, readonly),
+      Fields.useProcessName({
+        model,
+        projectCode,
+        isCreate: !data?.id,
+        from,
+        processName: data?.processName
+      })
+    ]
+  }
+
+  return {
+    json: [
+      Fields.useName(from),
+      ...extra,
+      Fields.useRunFlag(),
+      Fields.useDescription(),
+      Fields.useTaskPriority(),
+      Fields.useWorkerGroup(),
+      Fields.useEnvironmentName(model, !model.id),
+      ...Fields.useTaskGroup(model, projectCode),
+      ...Fields.useFailed(),
+      ...Fields.useResourceLimit(),
+      Fields.useDelayTime(model),
+      ...Fields.useTimeoutAlarm(model),
+      ...Fields.usePytorch(model),
+      Fields.usePreTasks()
+    ] as IJsonItem[],
+    model
+  }
+}
+
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index bda369f6fd..6260be2140 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -364,6 +364,14 @@ interface ITaskParams {
   taskId?: string
   online?: boolean
   sagemakerRequestJson?: string
+  script?: string
+  scriptParams?: string
+  pythonPath?: string
+  isCreateEnvironment?: string
+  pythonCommand?: string
+  pythonEnvTool?: string
+  requirements?: string
+  condaPythonVersion?: string
 }
 
 interface INodeData
diff --git a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
index 3b42fc6fed..26505d6a39 100644
--- a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
@@ -43,6 +43,7 @@ export type TaskType =
   | 'SAGEMAKER'
   | 'CHUNJUN'
   | 'FLINK_STREAM'
+  | 'PYTORCH'
 
 export type TaskExecuteType = 'STREAM' | 'BATCH'
 
@@ -146,6 +147,10 @@ export const TASK_TYPES_MAP = {
     alias: 'FLINK_STREAM',
     helperLinkDisable: true,
     taskExecuteType: 'STREAM'
+  },
+  PYTORCH: {
+    alias: 'Pytorch',
+    helperLinkDisable: true
   }
 } as {
   [key in TaskType]: {
diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
index 6b8d7e6de7..0549372ac0 100644
--- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
+++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
@@ -183,6 +183,9 @@ $bgLight: #ffffff;
     &.icon-chunjun {
       background-image: url('/images/task-icons/chunjun.png');
     }
+    &.icon-pytorch {
+      background-image: url('/images/task-icons/pytorch.png');
+    }
   }
 
   &:hover {
@@ -269,6 +272,9 @@ $bgLight: #ffffff;
       &.icon-chunjun {
         background-image: url('/images/task-icons/chunjun_hover.png');
       }
+      &.icon-pytorch {
+        background-image: url('/images/task-icons/pytorch_hover.png');
+      }
     }
   }
 }