You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/05/31 03:00:27 UTC

[dolphinscheduler] branch dev updated: [feat][task] Add OpenMLDB task plugin (#10198)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new c5961aed25 [feat][task] Add OpenMLDB task plugin (#10198)
c5961aed25 is described below

commit c5961aed25329b1cbd0aab34fc49cc7cffb39753
Author: HuangWei <hw...@qq.com>
AuthorDate: Tue May 31 11:00:21 2022 +0800

    [feat][task] Add OpenMLDB task plugin (#10198)
---
 docs/configs/docsdev.js                            |   8 +
 docs/docs/en/guide/task/openmldb.md                |  76 +++++++++
 docs/docs/zh/guide/task/openmldb.md                |  68 ++++++++
 .../img/tasks/demo/openmldb-feature-extraction.png | Bin 0 -> 153223 bytes
 docs/img/tasks/demo/openmldb-load-data.png         | Bin 0 -> 91248 bytes
 docs/img/tasks/icons/openmldb.png                  | Bin 0 -> 90057 bytes
 .../dolphinscheduler-task-all/pom.xml              |   6 +
 .../dolphinscheduler-task-openmldb/pom.xml         |  45 ++++++
 .../plugin/task/openmldb/OpenmldbParameters.java   |  90 +++++++++++
 .../plugin/task/openmldb/OpenmldbTask.java         | 176 +++++++++++++++++++++
 .../plugin/task/openmldb/OpenmldbTaskChannel.java  |  47 ++++++
 .../task/openmldb/OpenmldbTaskChannelFactory.java  |  44 ++++++
 .../plugin/task/openmldb/OpenmldbTaskTest.java     |  89 +++++++++++
 .../plugin/task/python/PythonTask.java             |  29 ++--
 dolphinscheduler-task-plugin/pom.xml               |   1 +
 .../public/images/task-icons/openmldb.png          | Bin 0 -> 90057 bytes
 .../public/images/task-icons/openmldb_hover.png    | Bin 0 -> 10323 bytes
 dolphinscheduler-ui/src/locales/en_US/project.ts   |   8 +
 dolphinscheduler-ui/src/locales/zh_CN/project.ts   |   8 +
 .../projects/task/components/node/fields/index.ts  |   1 +
 .../task/components/node/fields/use-openmldb.ts    |  87 ++++++++++
 .../projects/task/components/node/format-data.ts   |   7 +
 .../projects/task/components/node/tasks/index.ts   |   4 +-
 .../task/components/node/tasks/use-openmldb.ts     |  85 ++++++++++
 .../views/projects/task/components/node/types.ts   |   3 +
 .../src/views/projects/task/constants/task-type.ts |   5 +
 .../workflow/components/dag/dag.module.scss        |   6 +
 pom.xml                                            |   5 +
 28 files changed, 883 insertions(+), 15 deletions(-)

diff --git a/docs/configs/docsdev.js b/docs/configs/docsdev.js
index 0f73e3a0bd..bf72d5f9ed 100644
--- a/docs/configs/docsdev.js
+++ b/docs/configs/docsdev.js
@@ -161,6 +161,10 @@ export default {
                                 title: 'MLflow',
                                 link: '/en-us/docs/dev/user_doc/guide/task/mlflow.html',
                             },
+                            {
+                                title: 'Openmldb',
+                                link: '/en-us/docs/dev/user_doc/guide/task/openmldb.html',
+                            },
                         ],
                     },
                     {
@@ -525,6 +529,10 @@ export default {
                                 title: 'MLflow',
                                 link: '/zh-cn/docs/dev/user_doc/guide/task/mlflow.html',
                             },
+                            {
+                                title: 'Openmldb',
+                                link: '/zh-cn/docs/dev/user_doc/guide/task/openmldb.html',
+                            },
                         ],
                     },
                     {
diff --git a/docs/docs/en/guide/task/openmldb.md b/docs/docs/en/guide/task/openmldb.md
new file mode 100644
index 0000000000..e2e48ea84a
--- /dev/null
+++ b/docs/docs/en/guide/task/openmldb.md
@@ -0,0 +1,76 @@
+# OpenMLDB Node
+
+## Overview
+
+[OpenMLDB](https://openmldb.ai/) is an excellent open source machine learning database, providing a full-stack 
+FeatureOps solution for production.
+
+OpenMLDB task plugin used to execute tasks on OpenMLDB cluster.
+
+## Create Task
+
+- Click `Project -> Management-Project -> Name-Workflow Definition`, and click the "Create Workflow" button to enter the
+  DAG editing page.
+- Drag from the toolbar <img src="/img/tasks/icons/openmldb.png" width="15"/> task node to canvas.
+
+## Task Example
+
+First, introduce some general parameters of DolphinScheduler
+
+- **Node name**: The node name in a workflow definition is unique.
+- **Run flag**: Identifies whether this node schedules normally, if it does not need to execute, select
+  the `prohibition execution`.
+- **Descriptive information**: Describe the function of the node.
+- **Task priority**: When the number of worker threads is insufficient, execute in the order of priority from high
+  to low, and tasks with the same priority will execute in a first-in first-out order.
+- **Worker grouping**: Assign tasks to the machines of the worker group to execute. If `Default` is selected,
+  randomly select a worker machine for execution.
+- **Environment Name**: Configure the environment name in which run the script.
+- **Times of failed retry attempts**: The number of times the task failed to resubmit.
+- **Failed retry interval**: The time interval (unit minute) for resubmitting the task after a failed task.
+- **Delayed execution time**: The time (unit minute) that a task delays in execution.
+- **Timeout alarm**: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm
+  email will send and the task execution will fail.
+- **Predecessor task**: Selecting a predecessor task for the current task, will set the selected predecessor task as
+  upstream of the current task.
+
+### OpenMLDB Parameters
+
+**Task Parameter**
+
+- **zookeeper** :OpenMLDB cluster zookeeper address, e.g. 127.0.0.1:2181.
+- **zookeeper path** : OpenMLDB cluster zookeeper path, e.g. /openmldb.
+- **Execute Mode** :determine the init mode, offline or online. You can switch it in sql statement.
+- **SQL statement** :SQL statement.
+- Custom parameters: It is the user-defined parameters of Python, which will replace the content with \${variable} in the script.
+
+Here are some examples:
+
+#### Load data
+
+![load data](/img/tasks/demo/openmldb-load-data.png)
+
+We use `LOAD DATA` to load data into OpenMLDB cluster. We select `offline` here, so it will load to offline storage.
+
+#### Feature extraction
+
+![fe](/img/tasks/demo/openmldb-feature-extraction.png)
+
+We use `SELECT INTO` to do feature extraction. We select `offline` here, so it will run sql on offline engine.
+
+## Environment to prepare
+
+### Start the OpenMLDB cluster
+
+You should create an OpenMLDB cluster first. If in production env, please check [deploy OpenMLDB](https://openmldb.ai/docs/en/v0.5/deploy/install_deploy.html).
+
+You can follow [run OpenMLDB in docker](https://openmldb.ai/docs/zh/v0.5/quickstart/openmldb_quickstart.html#id11)
+to a quick start.
+
+### Python env
+
+The OpenMLDB task will use OpenMLDB Python SDK to connect OpenMLDB cluster. So you should have the Python env.
+
+We will use `python3` by default. You can set `PYTHON_HOME` to use your custom python env.
+
+Make sure you have installed OpenMLDB Python SDK in the host where the worker server running, using `pip install openmldb`.
diff --git a/docs/docs/zh/guide/task/openmldb.md b/docs/docs/zh/guide/task/openmldb.md
new file mode 100644
index 0000000000..f889b978f2
--- /dev/null
+++ b/docs/docs/zh/guide/task/openmldb.md
@@ -0,0 +1,68 @@
+# OpenMLDB 节点
+
+## 综述
+
+[OpenMLDB](https://openmldb.ai/) 是一个优秀的开源机器学习数据库,提供生产级数据及特征开发全栈解决方案。
+
+OpenMLDB任务组件可以连接OpenMLDB集群执行任务。
+
+## 创建任务
+
+- 点击项目管理-项目名称-工作流定义,点击“创建工作流”按钮,进入 DAG 编辑页面;
+- 拖动工具栏的 <img src="/img/tasks/icons/openmldb.png" width="15"/> 任务节点到画板中。
+
+## 任务样例
+
+首先介绍一些DS通用参数:
+
+- **节点名称** :设置任务的名称。一个工作流定义中的节点名称是唯一的。
+- **运行标志** :标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。
+- **描述** :描述该节点的功能。
+- **任务优先级** :worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。
+- **Worker 分组** :任务分配给 worker 组的机器执行,选择 Default,会随机选择一台 worker 机执行。
+- **环境名称** :配置运行脚本的环境。
+- **失败重试次数** :任务失败重新提交的次数。
+- **失败重试间隔** :任务失败重新提交任务的时间间隔,以分钟为单位。
+- **延迟执行时间** :任务延迟执行的时间,以分钟为单位。
+- **超时告警** :勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败。
+- **前置任务** :选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
+
+### OpenMLDB 参数
+
+**任务参数**
+
+- **zookeeper地址** :OpenMLDB集群连接地址中的zookeeper地址, e.g. 127.0.0.1:2181。
+- **zookeeper路径** : OpenMLDB集群连接地址中的zookeeper路径, e.g. /openmldb。
+- **执行模式** :初始执行模式(离线/在线),你可以在sql语句中随时切换。
+- **SQL语句** :SQL语句。
+- 自定义参数:是PYTHON局部的用户自定义参数,会替换脚本中以${变量}的内容。
+
+下面有几个例子:
+
+#### 导入数据
+
+![load data](/img/tasks/demo/openmldb-load-data.png)
+
+我们使用`LOAD DATA`语句导入数据到OpenMLDB集群。因为选择的是离线执行模式,所以将会导入数据到离线存储中。
+
+#### 特征抽取
+
+![fe](/img/tasks/demo/openmldb-feature-extraction.png)
+
+我们使用`SELECT INTO`进行特征抽取。因为选择的是离线执行模式,所以会使用离线引擎做特征计算。
+
+## 环境准备
+
+### OpenMLDB 启动
+
+执行任务之前,你需要启动OpenMLDB集群。如果是在生产环境,请参考[deploy OpenMLDB](https://openmldb.ai/docs/zh/v0.5/deploy/install_deploy.html).
+
+你可以参考[在docker中运行OpenMLDB集群](https://openmldb.ai/docs/zh/v0.5/quickstart/openmldb_quickstart.html#id11) 快速启动。
+
+### Python 环境
+
+OpenMLDB任务组件将使用OpenMLDB Python SDK来连接OpenMLDB。所以你需要Python环境。
+
+我们默认使用`python3`,你可以通过配置`PYTHON_HOME`来设置自己的Python环境。
+
+请确保已通过`pip install openmldb`,在worker server的主机中安装了OpenMLDB Python SDK。
diff --git a/docs/img/tasks/demo/openmldb-feature-extraction.png b/docs/img/tasks/demo/openmldb-feature-extraction.png
new file mode 100644
index 0000000000..9f5fb195ac
Binary files /dev/null and b/docs/img/tasks/demo/openmldb-feature-extraction.png differ
diff --git a/docs/img/tasks/demo/openmldb-load-data.png b/docs/img/tasks/demo/openmldb-load-data.png
new file mode 100644
index 0000000000..559c68a86e
Binary files /dev/null and b/docs/img/tasks/demo/openmldb-load-data.png differ
diff --git a/docs/img/tasks/icons/openmldb.png b/docs/img/tasks/icons/openmldb.png
new file mode 100644
index 0000000000..a06cbfe855
Binary files /dev/null and b/docs/img/tasks/icons/openmldb.png differ
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
index 5ecec9263e..2422a4628e 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
@@ -165,6 +165,12 @@
             <artifactId>dolphinscheduler-task-mlflow</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-task-openmldb</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/pom.xml
new file mode 100644
index 0000000000..065d99386f
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>dolphinscheduler-task-plugin</artifactId>
+        <groupId>org.apache.dolphinscheduler</groupId>
+        <version>dev-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>dolphinscheduler-task-openmldb</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-spi</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-task-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-task-python</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbParameters.java
new file mode 100644
index 0000000000..5f8a9b9047
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbParameters.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.openmldb;
+
+import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.List;
+
+public class OpenmldbParameters extends AbstractParameters {
+
+    private String zk;
+    private String zkPath;
+    private String executeMode;
+    /**
+     * origin sql script
+     */
+    private String sql;
+
+    /**
+     * resource list
+     */
+    private List<ResourceInfo> resourceList;
+
+    public String getZk() {
+        return zk;
+    }
+
+    public void setZk(String zk) {
+        this.zk = zk;
+    }
+
+    public String getZkPath() {
+        return zkPath;
+    }
+
+    public void setZkPath(String zkPath) {
+        this.zkPath = zkPath;
+    }
+
+    public String getExecuteMode() {
+        return executeMode;
+    }
+
+    public void setExecuteMode(String executeMode) {
+        this.executeMode = executeMode;
+    }
+
+    public String getSql() {
+        return sql;
+    }
+
+    public void setSql(String sql) {
+        this.sql = sql;
+    }
+
+    public List<ResourceInfo> getResourceList() {
+        return resourceList;
+    }
+
+    public void setResourceList(List<ResourceInfo> resourceList) {
+        this.resourceList = resourceList;
+    }
+
+    @Override
+    public boolean checkParameters() {
+        return StringUtils.isNotEmpty(zk) && StringUtils.isNotEmpty(zkPath) && StringUtils.isNotEmpty(sql);
+    }
+
+    @Override
+    public List<ResourceInfo> getResourceFilesList() {
+        return this.resourceList;
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
new file mode 100644
index 0000000000..a93c191f0c
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.openmldb;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.plugin.task.python.PythonTask;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.nio.file.Paths;
+import java.util.Locale;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * openmldb task
+ */
+public class OpenmldbTask extends PythonTask {
+
+    /**
+     * openmldb parameters
+     */
+    private OpenmldbParameters openmldbParameters;
+
+    /**
+     * python process(openmldb only supports version 3 by default)
+     */
+    private static final String OPENMLDB_PYTHON = "python3";
+    private static final Pattern PYTHON_PATH_PATTERN = Pattern.compile("/bin/python[\\d.]*$");
+
+    /**
+     * constructor
+     *
+     * @param taskRequest taskRequest
+     */
+    public OpenmldbTask(TaskExecutionContext taskRequest) {
+        super(taskRequest);
+    }
+
+    @Override
+    public void init() {
+        logger.info("openmldb task params {}", taskRequest.getTaskParams());
+
+        openmldbParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), OpenmldbParameters.class);
+
+        if (openmldbParameters == null || !openmldbParameters.checkParameters()) {
+            throw new TaskException("openmldb task params is not valid");
+        }
+    }
+
+    @Override
+    @Deprecated
+    public String getPreScript() {
+        return "";
+    }
+
+    @Override
+    public AbstractParameters getParameters() {
+        return openmldbParameters;
+    }
+
+    /**
+     * build python command file path
+     *
+     * @return python command file path
+     */
+    @Override
+    protected String buildPythonCommandFilePath() {
+        return String.format("%s/openmldb_%s.py", taskRequest.getExecutePath(), taskRequest.getTaskAppId());
+    }
+
+    /**
+     * build python script content from sql
+     *
+     * @return raw python script
+     */
+    @Override
+    protected String buildPythonScriptContent() {
+        logger.info("raw sql script : {}", openmldbParameters.getSql());
+
+        String rawSQLScript = openmldbParameters.getSql().replaceAll("[\\r]?\\n", "\n");
+        Map<String, Property> paramsMap = mergeParamsWithContext(openmldbParameters);
+        rawSQLScript = ParameterUtils.convertParameterPlaceholders(rawSQLScript, ParamUtils.convert(paramsMap));
+
+        // convert sql to python script
+        String pythonScript = buildPythonScriptsFromSql(rawSQLScript);
+        logger.info("rendered python script : {}", pythonScript);
+        return pythonScript;
+    }
+
+    private String buildPythonScriptsFromSql(String rawSqlScript) {
+        // imports
+        StringBuilder builder = new StringBuilder("import openmldb\nimport sqlalchemy as db\n");
+
+        // connect to openmldb
+        builder.append(String.format("engine = db.create_engine('openmldb:///?zk=%s&zkPath=%s')\n",
+                openmldbParameters.getZk(), openmldbParameters.getZkPath()));
+        builder.append("con = engine.connect()\n");
+
+        // execute mode
+        String executeMode = openmldbParameters.getExecuteMode().toLowerCase(Locale.ROOT);
+        builder.append("con.execute(\"set @@execute_mode='").append(executeMode).append("';\")\n");
+        // offline job should be sync, and set job_timeout to 30min(==server.channel_keep_alive_time).
+        // You can set it longer in sqls.
+        if (executeMode.equals("offline")) {
+            builder.append("con.execute(\"set @@sync_job=true\")\n");
+            builder.append("con.execute(\"set @@job_timeout=1800000\")\n");
+        }
+
+        // split sql to list
+        // skip the sql only has space characters
+        Pattern pattern = Pattern.compile("\\S");
+        for (String sql : rawSqlScript.split(";")) {
+            if (pattern.matcher(sql).find()) {
+                sql = sql.replaceAll("\\n", "\\\\n");
+                builder.append("con.execute(\"").append(sql).append("\")\n");
+            }
+        }
+        return builder.toString();
+    }
+
+    /**
+     * Build the python task command.
+     * If user have set the 'PYTHON_HOME' environment, we will use the 'PYTHON_HOME',
+     * if not, we will default use python.
+     *
+     * @param pythonFile Python file, cannot be empty.
+     * @return Python execute command, e.g. 'python test.py'.
+     */
+    @Override
+    protected String buildPythonExecuteCommand(String pythonFile) {
+        Preconditions.checkNotNull(pythonFile, "Python file cannot be null");
+        return getPythonCommand() + " " + pythonFile;
+    }
+
+    private String getPythonCommand() {
+        String pythonHome = System.getenv(PYTHON_HOME);
+        return getPythonCommand(pythonHome);
+    }
+
+    private String getPythonCommand(String pythonHome) {
+        if (StringUtils.isEmpty(pythonHome)) {
+            return OPENMLDB_PYTHON;
+        }
+        // If your python home is "xx/bin/python[xx]", you are forced to use python3
+        String pythonBinPath = "/bin/" + OPENMLDB_PYTHON;
+        Matcher matcher = PYTHON_PATH_PATTERN.matcher(pythonHome);
+        if (matcher.find()) {
+            return matcher.replaceAll(pythonBinPath);
+        }
+        return Paths.get(pythonHome, pythonBinPath).toString();
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTaskChannel.java
new file mode 100644
index 0000000000..27a1e349d3
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTaskChannel.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.openmldb;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+public class OpenmldbTaskChannel implements TaskChannel {
+    @Override
+    public void cancelApplication(boolean status) {
+
+    }
+
+    @Override
+    public OpenmldbTask createTask(TaskExecutionContext taskRequest) {
+        return new OpenmldbTask(taskRequest);
+    }
+
+    @Override
+    public AbstractParameters parseParameters(ParametersNode parametersNode) {
+        return JSONUtils.parseObject(parametersNode.getTaskParams(), OpenmldbParameters.class);
+    }
+
+    @Override
+    public ResourceParametersHelper getResources(String parameters) {
+        return null;
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTaskChannelFactory.java
new file mode 100644
index 0000000000..b4698c9461
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTaskChannelFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.openmldb;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
+import org.apache.dolphinscheduler.spi.params.base.PluginParams;
+
+import java.util.List;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(TaskChannelFactory.class)
+public class OpenmldbTaskChannelFactory implements TaskChannelFactory {
+    @Override
+    public TaskChannel create() {
+        return new OpenmldbTaskChannel();
+    }
+
+    @Override
+    public String getName() {
+        return "OPENMLDB";
+    }
+
+    @Override
+    public List<PluginParams> getParams() {
+        return null;
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/test/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/test/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTaskTest.java
new file mode 100644
index 0000000000..c5ed143a5c
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/test/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTaskTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.openmldb;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.powermock.reflect.Whitebox;
+
+public class OpenmldbTaskTest {
+    static class MockOpenmldbTask extends OpenmldbTask {
+        /**
+         * constructor
+         *
+         * @param taskRequest taskRequest
+         */
+        public MockOpenmldbTask(TaskExecutionContext taskRequest) {
+            super(taskRequest);
+        }
+
+        @Override
+        protected Map<String, Property> mergeParamsWithContext(AbstractParameters parameters) {
+            return new HashMap<>();
+        }
+    }
+
+    private OpenmldbTask createOpenmldbTask() {
+        return new MockOpenmldbTask(null);
+    }
+
+    @Test
+    public void buildPythonExecuteCommand() throws Exception {
+        OpenmldbTask openmldbTask = createOpenmldbTask();
+        String pythonFile = "test.py";
+        String result1 = openmldbTask.buildPythonExecuteCommand(pythonFile);
+        Assert.assertEquals("python3 test.py", result1);
+    }
+
+    @Test
+    public void buildSQLWithComment() throws Exception {
+        OpenmldbTask openmldbTask = createOpenmldbTask();
+        OpenmldbParameters openmldbParameters = new OpenmldbParameters();
+        openmldbParameters.setExecuteMode("offline");
+        String rawSQLScript = "select * from users\r\n"
+                + "-- some comment\n"
+                + "inner join order on users.order_id = order.id; \n\n;"
+                + "select * from users;";
+        openmldbParameters.setSql(rawSQLScript);
+        Whitebox.setInternalState(openmldbTask, "openmldbParameters", openmldbParameters);
+        OpenmldbParameters internal = (OpenmldbParameters) openmldbTask.getParameters();
+        Assert.assertNotNull(internal);
+        Assert.assertEquals(internal.getExecuteMode(), "offline");
+
+        String result1 = openmldbTask.buildPythonScriptContent();
+        Assert.assertEquals("import openmldb\n"
+                        + "import sqlalchemy as db\n"
+                        + "engine = db.create_engine('openmldb:///?zk=null&zkPath=null')\n"
+                        + "con = engine.connect()\n"
+                        + "con.execute(\"set @@execute_mode='offline';\")\n"
+                        + "con.execute(\"set @@sync_job=true\")\n"
+                        + "con.execute(\"set @@job_timeout=1800000\")\n"
+                        + "con.execute(\"select * from users\\n-- some comment\\ninner join order on users.order_id = "
+                        + "order.id\")\n"
+                        + "con.execute(\"select * from users\")\n"
+                , result1);
+    }
+
+}
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
index c153847b18..87fba6fe81 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
@@ -51,16 +51,16 @@ public class PythonTask extends AbstractTaskExecutor {
     /**
      * python parameters
      */
-    private PythonParameters pythonParameters;
+    protected PythonParameters pythonParameters;
 
     /**
      * shell command executor
      */
     private ShellCommandExecutor shellCommandExecutor;
 
-    private TaskExecutionContext taskRequest;
+    protected TaskExecutionContext taskRequest;
 
-    private static final String PYTHON_HOME = "PYTHON_HOME";
+    protected static final String PYTHON_HOME = "PYTHON_HOME";
 
     private static final String DEFAULT_PYTHON_VERSION = "python";
 
@@ -109,7 +109,7 @@ public class PythonTask extends AbstractTaskExecutor {
             String pythonScriptFile = buildPythonCommandFilePath();
 
             // create this file
-            createPythonCommandFileIfNotExists(pythonScriptContent,pythonScriptFile);
+            createPythonCommandFileIfNotExists(pythonScriptContent, pythonScriptFile);
             String command = buildPythonExecuteCommand(pythonScriptFile);
 
             TaskResponse taskResponse = shellCommandExecutor.run(command);
@@ -140,7 +140,7 @@ public class PythonTask extends AbstractTaskExecutor {
      *
      * @param rawScript rawScript
      * @return String
-     * @throws StringIndexOutOfBoundsException StringIndexOutOfBoundsException
+     * @throws StringIndexOutOfBoundsException if substring index is out of bounds
      */
     private static String convertPythonScriptPlaceholders(String rawScript) throws StringIndexOutOfBoundsException {
         int len = "${setShareVar(${".length();
@@ -170,7 +170,7 @@ public class PythonTask extends AbstractTaskExecutor {
     /**
      * create python command file if not exists
      *
-     * @param pythonScript exec python script
+     * @param pythonScript     exec python script
      * @param pythonScriptFile python script file
      * @throws IOException io exception
      */
@@ -209,22 +209,23 @@ public class PythonTask extends AbstractTaskExecutor {
      * @return raw python script
      * @throws Exception exception
      */
-    private String buildPythonScriptContent() throws Exception {
+    protected String buildPythonScriptContent() throws Exception {
+        logger.info("raw python script : {}", pythonParameters.getRawScript());
         String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
+        Map<String, Property> paramsMap = mergeParamsWithContext(pythonParameters);
+        return ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
+    }
 
+    protected Map<String, Property> mergeParamsWithContext(AbstractParameters parameters) {
         // replace placeholder
-        Map<String, Property> paramsMap = ParamUtils.convert(taskRequest, pythonParameters);
+        Map<String, Property> paramsMap = ParamUtils.convert(taskRequest, parameters);
         if (MapUtils.isEmpty(paramsMap)) {
             paramsMap = new HashMap<>();
         }
         if (MapUtils.isNotEmpty(taskRequest.getParamsMap())) {
             paramsMap.putAll(taskRequest.getParamsMap());
         }
-        rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
-
-        logger.info("raw python script : {}", pythonParameters.getRawScript());
-
-        return rawPythonScript;
+        return paramsMap;
     }
 
     /**
@@ -235,7 +236,7 @@ public class PythonTask extends AbstractTaskExecutor {
      * @param pythonFile Python file, cannot be empty.
      * @return Python execute command, e.g. 'python test.py'.
      */
-    private String buildPythonExecuteCommand(String pythonFile) {
+    protected String buildPythonExecuteCommand(String pythonFile) {
         Preconditions.checkNotNull(pythonFile, "Python file cannot be null");
 
         String pythonHome = String.format("${%s}", PYTHON_HOME);
diff --git a/dolphinscheduler-task-plugin/pom.xml b/dolphinscheduler-task-plugin/pom.xml
index b7f3c097cb..43f0209e0f 100644
--- a/dolphinscheduler-task-plugin/pom.xml
+++ b/dolphinscheduler-task-plugin/pom.xml
@@ -54,5 +54,6 @@
         <module>dolphinscheduler-task-zeppelin</module>
         <module>dolphinscheduler-task-jupyter</module>
         <module>dolphinscheduler-task-mlflow</module>
+        <module>dolphinscheduler-task-openmldb</module>
     </modules>
 </project>
diff --git a/dolphinscheduler-ui/public/images/task-icons/openmldb.png b/dolphinscheduler-ui/public/images/task-icons/openmldb.png
new file mode 100644
index 0000000000..a06cbfe855
Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/openmldb.png differ
diff --git a/dolphinscheduler-ui/public/images/task-icons/openmldb_hover.png b/dolphinscheduler-ui/public/images/task-icons/openmldb_hover.png
new file mode 100644
index 0000000000..c42c332611
Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/openmldb_hover.png differ
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts
index 8f37959248..381de4232c 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -659,6 +659,14 @@ export default {
     mlflowProjectRepository_tips: 'github respository or path on worker',
     mlflowProjectVersion: 'Project Version',
     mlflowProjectVersion_tips: 'git version',
+    openmldb_zk_address: 'zookeeper address',
+    openmldb_zk_address_tips: 'Please enter the zookeeper address',
+    openmldb_zk_path: 'zookeeper path',
+    openmldb_zk_path_tips: 'Please enter the zookeeper path',
+    openmldb_execute_mode: 'Execute Mode',
+    openmldb_execute_mode_tips: 'Please select the execute mode',
+    openmldb_execute_mode_offline: 'offline',
+    openmldb_execute_mode_online: 'online',
     send_email: 'Send Email',
     log_display: 'Log display',
     rows_of_result: 'rows of result',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index 5e10480c4e..ccc93958a9 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -646,6 +646,14 @@ export default {
     mlflowProjectRepository_tips: '可以为github仓库或worker上的路径',
     mlflowProjectVersion: '项目版本',
     mlflowProjectVersion_tips: '项目git版本',
+    openmldb_zk_address: 'zookeeper地址',
+    openmldb_zk_address_tips: '请输入zookeeper地址',
+    openmldb_zk_path: 'zookeeper路径',
+    openmldb_zk_path_tips: '请输入zookeeper路径',
+    openmldb_execute_mode: '执行模式',
+    openmldb_execute_mode_tips: '请选择执行模式',
+    openmldb_execute_mode_offline: '离线',
+    openmldb_execute_mode_online: '在线',
     send_email: '发送邮件',
     log_display: '日志显示',
     rows_of_result: '行查询结果',
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
index 4bddd81834..bc3a2c61dd 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
@@ -68,3 +68,4 @@ export { useJupyter } from './use-jupyter'
 export { useMlflow } from './use-mlflow'
 export { useMlflowProjects } from './use-mlflow-projects'
 export { useMlflowModels } from './use-mlflow-models'
+export { useOpenmldb } from './use-openmldb'
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-openmldb.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-openmldb.ts
new file mode 100644
index 0000000000..9f5a38db6f
--- /dev/null
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-openmldb.ts
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import { useI18n } from 'vue-i18n'
+import { useCustomParams, useResources } from '.'
+import type { IJsonItem } from '../types'
+
+export function useOpenmldb(model: { [field: string]: any }): IJsonItem[] {
+  const { t } = useI18n()
+  const options = [
+    {
+      label: t('project.node.openmldb_execute_mode_offline'),
+      value: 'offline'
+    },
+    {
+      label: t('project.node.openmldb_execute_mode_online'),
+      value: 'online'
+    }
+  ]
+	return [
+		{
+			type: 'input',
+			field: 'zk',
+			name: t('project.node.openmldb_zk_address'),
+			props: {
+			  placeholder: t('project.node.openmldb_zk_address_tips')
+			},
+			validate: {
+			  trigger: ['input', 'blur'],
+			  required: true,
+			  validator(validate: any, value: string) {
+				if (!value) {
+				  return new Error(t('project.node.openmldb_zk_address_tips'))
+				}
+			  }
+			}
+		},
+		{
+			type: 'input',
+			field: 'zkPath',
+			name: t('project.node.openmldb_zk_path'),
+			props: {
+			  placeholder: t('project.node.openmldb_zk_path_tips')
+			},
+			validate: {
+			  trigger: ['input', 'blur'],
+			  required: true,
+			  validator(validate: any, value: string) {
+				if (!value) {
+				  return new Error(t('project.node.openmldb_zk_path_tips'))
+				}
+			  }
+			}
+		},
+		{
+			type: 'radio',
+			field: 'executeMode',
+			name: t('project.node.openmldb_execute_mode'),
+			options: options
+		},
+		{
+			type: 'editor',
+			field: 'sql',
+			name: t('project.node.sql_statement'),
+			validate: {
+			  trigger: ['input', 'trigger'],
+			  required: true,
+			  message: t('project.node.sql_empty_tips')
+			}
+		},
+		useResources(),
+    	...useCustomParams({ model, field: 'localParams', isSimple: false })
+	]
+}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index d993aaa234..2ea47c72cb 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -356,6 +356,13 @@ export function formatParams(data: INodeData): {
     taskParams.mlflowProjectVersion = data.mlflowProjectVersion
   }
 
+  if (data.taskType === 'OPENMLDB') {
+    taskParams.zk = data.zk
+    taskParams.zkPath = data.zkPath
+    taskParams.executeMode = data.executeMode
+    taskParams.sql = data.sql
+  }
+
   if (data.taskType === 'PIGEON') {
     taskParams.targetJobName = data.targetJobName
   }
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
index 726a796cd5..a151b70d70 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
@@ -37,6 +37,7 @@ import { useZeppelin } from './use-zeppelin'
 import { useK8s } from './use-k8s'
 import { useJupyter } from './use-jupyter'
 import { useMlflow } from './use-mlflow'
+import { useOpenmldb } from './use-openmldb'
 
 export default {
   SHELL: useShell,
@@ -60,5 +61,6 @@ export default {
   ZEPPELIN: useZeppelin,
   K8S: useK8s,
   JUPYTER: useJupyter,
-  MLFLOW: useMlflow
+  MLFLOW: useMlflow,
+  OPENMLDB: useOpenmldb
 }
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-openmldb.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-openmldb.ts
new file mode 100644
index 0000000000..9f8d560de9
--- /dev/null
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-openmldb.ts
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive } from 'vue'
+import * as Fields from '../fields/index'
+import type { IJsonItem, INodeData } from '../types'
+import { ITaskData } from '../types'
+
+export function useOpenmldb({
+  projectCode,
+  from = 0,
+  readonly,
+  data
+}: {
+  projectCode: number
+  from?: number
+  readonly?: boolean
+  data?: ITaskData
+}) {
+  const model = reactive({
+    name: '',
+    taskType: 'OPENMLDB',
+    flag: 'YES',
+    description: '',
+    timeoutFlag: false,
+    timeoutNotifyStrategy: ['WARN'],
+    localParams: [],
+    environmentCode: null,
+    failRetryInterval: 1,
+    failRetryTimes: 0,
+    workerGroup: 'default',
+    delayTime: 0,
+    timeout: 30,
+    zk: '',
+    zkPath: '',
+    executeMode: 'offline'
+  } as INodeData)
+
+  let extra: IJsonItem[] = []
+  if (from === 1) {
+    extra = [
+      Fields.useTaskType(model, readonly),
+      Fields.useProcessName({
+        model,
+        projectCode,
+        isCreate: !data?.id,
+        from,
+        processName: data?.processName
+      })
+    ]
+  }
+
+  return {
+    json: [
+      Fields.useName(from),
+      ...extra,
+      Fields.useRunFlag(),
+      Fields.useDescription(),
+      Fields.useTaskPriority(),
+      Fields.useWorkerGroup(),
+      Fields.useEnvironmentName(model, !model.id),
+      ...Fields.useTaskGroup(model, projectCode),
+      ...Fields.useFailed(),
+      Fields.useDelayTime(model),
+      ...Fields.useTimeoutAlarm(model),
+      ...Fields.useOpenmldb(model),
+      Fields.usePreTasks()
+    ] as IJsonItem[],
+    model
+  }
+}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index 78f48c6b24..f44e65c95f 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -336,6 +336,9 @@ interface ITaskParams {
   deployType?: string
   deployPort?: string
   deployModelKey?: string
+  zk?: string
+  zkPath?: string
+  executeMode?: string
 }
 
 interface INodeData
diff --git a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
index e291d184e6..26cd32f544 100644
--- a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
@@ -37,6 +37,7 @@ export type TaskType =
   | 'K8S'
   | 'JUPYTER'
   | 'MLFLOW'
+  | 'OPENMLDB'
 
 export const TASK_TYPES_MAP = {
   SHELL: {
@@ -113,5 +114,9 @@ export const TASK_TYPES_MAP = {
   MLFLOW: {
     alias: 'MLFLOW',
     helperLinkDisable: true
+  },
+  OPENMLDB: {
+    alias: 'OPENMLDB',
+    helperLinkDisable: true
   }
 } as { [key in TaskType]: { alias: string; helperLinkDisable?: boolean } }
diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
index 725ae513a0..bbe539d813 100644
--- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
+++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
@@ -167,6 +167,9 @@ $bgLight: #ffffff;
     &.icon-mlflow {
       background-image: url('/images/task-icons/mlflow.png');
     }
+    &.icon-openmldb {
+      background-image: url('/images/task-icons/openmldb.png');
+    }
   }
 
   &:hover {
@@ -237,6 +240,9 @@ $bgLight: #ffffff;
       &.icon-mlflow {
         background-image: url('/images/task-icons/mlflow.png');
       }
+      &.icon-openmldb {
+        background-image: url('/images/task-icons/openmldb_hover.png');
+      }
     }
   }
 }
diff --git a/pom.xml b/pom.xml
index 9522d5eb86..7b97d1f028 100644
--- a/pom.xml
+++ b/pom.xml
@@ -408,6 +408,11 @@
                 <artifactId>dolphinscheduler-task-api</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.dolphinscheduler</groupId>
+                <artifactId>dolphinscheduler-task-python</artifactId>
+                <version>${project.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.dolphinscheduler</groupId>
                 <artifactId>dolphinscheduler-task-all</artifactId>