You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/09/17 07:24:25 UTC

[dolphinscheduler] branch dev updated: [feat][python] Support OpenMLDB task in python api (#11944)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new e50a32d9cc [feat][python] Support OpenMLDB task in python api (#11944)
e50a32d9cc is described below

commit e50a32d9ccee57e4b56f63b9bee2c4fd888c0c32
Author: JieguangZhou <ji...@163.com>
AuthorDate: Sat Sep 17 15:24:17 2022 +0800

    [feat][python] Support OpenMLDB task in python api (#11944)
    
    Using the whole word zookeeper instead of short cut zk
    Co-authored-by: Jiajie Zhong <zh...@gmail.com>
---
 .../pydolphinscheduler/docs/source/tasks/index.rst |  1 +
 .../docs/source/tasks/{index.rst => openmldb.rst}  | 53 ++++++++--------
 .../examples/yaml_define/OpenMLDB.yaml             | 33 ++++++++++
 .../src/pydolphinscheduler/constants.py            |  1 +
 .../examples/task_openmldb_example.py              | 43 +++++++++++++
 .../src/pydolphinscheduler/tasks/__init__.py       |  2 +
 .../src/pydolphinscheduler/tasks/openmldb.py       | 48 ++++++++++++++
 .../tests/tasks/test_openmldb.py                   | 73 ++++++++++++++++++++++
 8 files changed, 226 insertions(+), 28 deletions(-)

diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
index a13652a526..5b9c165700 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
@@ -42,4 +42,5 @@ In this section
    sub_process
 
    sagemaker
+   openmldb
    pytorch
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/openmldb.rst
similarity index 60%
copy from dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
copy to dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/openmldb.rst
index a13652a526..125313dc21 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/openmldb.rst
@@ -15,31 +15,28 @@
    specific language governing permissions and limitations
    under the License.
 
-Tasks
-=====
-
-In this section 
-
-.. toctree::
-   :maxdepth: 1
-   
-   func_wrap
-   shell
-   sql
-   python
-   http
-
-   switch
-   condition
-   dependent
-
-   spark
-   flink
-   map_reduce
-   procedure
-
-   datax
-   sub_process
-
-   sagemaker
-   pytorch
+OpenMLDB
+=========
+
+
+A OpenMLDB task type's example and dive into information of **PyDolphinScheduler**.
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_openmldb_example.py
+   :start-after: [start workflow_declare]
+   :end-before: [end workflow_declare]
+
+Dive Into
+---------
+
+.. automodule:: pydolphinscheduler.tasks.openmldb
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/OpenMLDB.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/OpenMLDB.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/OpenMLDB.yaml
new file mode 100644
index 0000000000..b455cb0768
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/OpenMLDB.yaml
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "OpenMLDB"
+
+# Define the tasks under the workflow
+tasks:
+  - name: OpenMLDB
+    task_type: OpenMLDB
+    zookeeper: "127.0.0.1:2181"
+    zookeeper_path: "/openmldb"
+    execute_mode: "online"
+    sql: |
+      USE demo_db;
+      set @@job_timeout=200000;
+      LOAD DATA INFILE 'file:///tmp/train_sample.csv'
+      INTO TABLE talkingdata OPTIONS(mode='overwrite');
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index 7eb5d04210..d8d2febfeb 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -58,6 +58,7 @@ class TaskType(str):
     SPARK = "SPARK"
     MR = "MR"
     SAGEMAKER = "SAGEMAKER"
+    OPENMLDB = "OPENMLDB"
     PYTORCH = "PYTORCH"
 
 
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_openmldb_example.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_openmldb_example.py
new file mode 100644
index 0000000000..5b90091ecf
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_openmldb_example.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [start workflow_declare]
+"""A example workflow for task openmldb."""
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.openmldb import OpenMLDB
+
+sql = """USE demo_db;
+set @@job_timeout=200000;
+LOAD DATA INFILE 'file:///tmp/train_sample.csv'
+INTO TABLE talkingdata OPTIONS(mode='overwrite');
+"""
+
+with ProcessDefinition(
+    name="task_openmldb_example",
+    tenant="tenant_exists",
+) as pd:
+    task_openmldb = OpenMLDB(
+        name="task_openmldb",
+        zookeeper="127.0.0.1:2181",
+        zookeeper_path="/openmldb",
+        execute_mode="offline",
+        sql=sql,
+    )
+
+    pd.run()
+# [end workflow_declare]
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
index 1481722433..e5b263c7c2 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
@@ -23,6 +23,7 @@ from pydolphinscheduler.tasks.dependent import Dependent
 from pydolphinscheduler.tasks.flink import Flink
 from pydolphinscheduler.tasks.http import Http
 from pydolphinscheduler.tasks.map_reduce import MR
+from pydolphinscheduler.tasks.openmldb import OpenMLDB
 from pydolphinscheduler.tasks.procedure import Procedure
 from pydolphinscheduler.tasks.python import Python
 from pydolphinscheduler.tasks.pytorch import Pytorch
@@ -41,6 +42,7 @@ __all__ = [
     "Flink",
     "Http",
     "MR",
+    "OpenMLDB",
     "Procedure",
     "Python",
     "Pytorch",
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/openmldb.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/openmldb.py
new file mode 100644
index 0000000000..5dad36ec11
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/openmldb.py
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task OpenMLDB."""
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task
+
+
+class OpenMLDB(Task):
+    """Task OpenMLDB object, declare behavior for OpenMLDB task to dolphinscheduler.
+
+    :param name: task name
+    :param zookeeper: OpenMLDB cluster zookeeper address, e.g. 127.0.0.1:2181.
+    :param zookeeper_path: OpenMLDB cluster zookeeper path, e.g. /openmldb.
+    :param execute_mode: Determine the init mode, offline or online. You can switch it in sql statementself.
+    :param sql: SQL statement.
+    """
+
+    _task_custom_attr = {
+        "zk",
+        "zk_path",
+        "execute_mode",
+        "sql",
+    }
+
+    def __init__(
+        self, name, zookeeper, zookeeper_path, execute_mode, sql, *args, **kwargs
+    ):
+        super().__init__(name, TaskType.OPENMLDB, *args, **kwargs)
+        self.zk = zookeeper
+        self.zk_path = zookeeper_path
+        self.execute_mode = execute_mode
+        self.sql = sql
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_openmldb.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_openmldb.py
new file mode 100644
index 0000000000..f580ab06b2
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_openmldb.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task OpenMLDB."""
+from unittest.mock import patch
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.tasks.openmldb import OpenMLDB
+
+
+def test_openmldb_get_define():
+    """Test task openmldb function get_define."""
+    zookeeper = "127.0.0.1:2181"
+    zookeeper_path = "/openmldb"
+    execute_mode = "offline"
+
+    sql = """USE demo_db;
+    set @@job_timeout=200000;
+    LOAD DATA INFILE 'file:///tmp/train_sample.csv'
+    INTO TABLE talkingdata OPTIONS(mode='overwrite');
+    """
+
+    code = 123
+    version = 1
+    name = "test_openmldb_get_define"
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": TaskType.OPENMLDB,
+        "taskParams": {
+            "resourceList": [],
+            "localParams": [],
+            "zk": zookeeper,
+            "zkPath": zookeeper_path,
+            "executeMode": execute_mode,
+            "sql": sql,
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "environmentCode": None,
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        openmldb = OpenMLDB(name, zookeeper, zookeeper_path, execute_mode, sql)
+        assert openmldb.get_define() == expect