You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/11/16 10:40:44 UTC

[dolphinscheduler-sdk-python] branch main updated: [feat][task] Support Kubernetes task (#15)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 89f7407  [feat][task] Support Kubernetes task (#15)
89f7407 is described below

commit 89f740728bcf210e91e240d65b136b913e3b607e
Author: tuchg <24...@users.noreply.github.com>
AuthorDate: Wed Nov 16 18:40:39 2022 +0800

    [feat][task] Support Kubernetes task (#15)
---
 docs/source/tasks/index.rst                        |  1 +
 docs/source/tasks/{index.rst => kubernetes.rst}    | 56 ++++++++----------
 examples/yaml_define/Kubernetes.yaml               | 29 +++++++++
 src/pydolphinscheduler/constants.py                |  1 +
 .../examples/task_kubernetes_example.py            | 36 ++++++++++++
 src/pydolphinscheduler/tasks/__init__.py           |  2 +
 src/pydolphinscheduler/tasks/kubernetes.py         | 55 +++++++++++++++++
 tests/tasks/test_kubernetes.py                     | 68 ++++++++++++++++++++++
 8 files changed, 217 insertions(+), 31 deletions(-)

diff --git a/docs/source/tasks/index.rst b/docs/source/tasks/index.rst
index 3f83f92..dba3503 100644
--- a/docs/source/tasks/index.rst
+++ b/docs/source/tasks/index.rst
@@ -37,6 +37,7 @@ In this section
    flink
    map_reduce
    procedure
+   kubernetes
 
    datax
    sub_process
diff --git a/docs/source/tasks/index.rst b/docs/source/tasks/kubernetes.rst
similarity index 60%
copy from docs/source/tasks/index.rst
copy to docs/source/tasks/kubernetes.rst
index 3f83f92..563605d 100644
--- a/docs/source/tasks/index.rst
+++ b/docs/source/tasks/kubernetes.rst
@@ -15,34 +15,28 @@
    specific language governing permissions and limitations
    under the License.
 
-Tasks
-=====
-
-In this section 
-
-.. toctree::
-   :maxdepth: 1
-   
-   func_wrap
-   shell
-   sql
-   python
-   http
-
-   switch
-   condition
-   dependent
-
-   spark
-   flink
-   map_reduce
-   procedure
-
-   datax
-   sub_process
-
-   sagemaker
-   mlflow
-   openmldb
-   pytorch
-   dvc
+Kubernetes
+==========
+
+
+A Kubernetes task type's example and dive into information of **PyDolphinScheduler**.
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_kubernetes_example.py
+   :start-after: [start workflow_declare]
+   :end-before: [end workflow_declare]
+
+Dive Into
+---------
+
+.. automodule:: pydolphinscheduler.tasks.kubernetes
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Kubernetes.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/examples/yaml_define/Kubernetes.yaml b/examples/yaml_define/Kubernetes.yaml
new file mode 100644
index 0000000..3197931
--- /dev/null
+++ b/examples/yaml_define/Kubernetes.yaml
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "kubernetes"
+
+# Define the tasks under the workflow
+tasks:
+  - name: kubernetes
+    task_type: K8S
+    image: ds-dev
+    namespace: '{ "name": "default","cluster": "lab" }'
+    minCpuCores: 2.0
+    minMemorySpace: 10.0
\ No newline at end of file
diff --git a/src/pydolphinscheduler/constants.py b/src/pydolphinscheduler/constants.py
index bedbbf2..0e19577 100644
--- a/src/pydolphinscheduler/constants.py
+++ b/src/pydolphinscheduler/constants.py
@@ -62,6 +62,7 @@ class TaskType(str):
     OPENMLDB = "OPENMLDB"
     PYTORCH = "PYTORCH"
     DVC = "DVC"
+    KUBERNETES = "K8S"
 
 
 class DefaultTaskCodeNum(str):
diff --git a/src/pydolphinscheduler/examples/task_kubernetes_example.py b/src/pydolphinscheduler/examples/task_kubernetes_example.py
new file mode 100644
index 0000000..b7a6a8a
--- /dev/null
+++ b/src/pydolphinscheduler/examples/task_kubernetes_example.py
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [start workflow_declare]
+"""A example workflow for task kubernetes."""
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.kubernetes import Kubernetes
+
+with ProcessDefinition(
+    name="task_kubernetes_example",
+    tenant="tenant_exists",
+) as pd:
+    task_k8s = Kubernetes(
+        name="task_k8s",
+        image="ds-dev",
+        namespace=str({"name": "default", "cluster": "lab"}),
+        min_cpu_cores=2.0,
+        min_memory_space=10.0,
+    )
+    pd.submit()
+# [end workflow_declare]
diff --git a/src/pydolphinscheduler/tasks/__init__.py b/src/pydolphinscheduler/tasks/__init__.py
index 972b1b7..4dc2a90 100644
--- a/src/pydolphinscheduler/tasks/__init__.py
+++ b/src/pydolphinscheduler/tasks/__init__.py
@@ -23,6 +23,7 @@ from pydolphinscheduler.tasks.dependent import Dependent
 from pydolphinscheduler.tasks.dvc import DVCDownload, DVCInit, DVCUpload
 from pydolphinscheduler.tasks.flink import Flink
 from pydolphinscheduler.tasks.http import Http
+from pydolphinscheduler.tasks.kubernetes import Kubernetes
 from pydolphinscheduler.tasks.map_reduce import MR
 from pydolphinscheduler.tasks.mlflow import (
     MLflowModels,
@@ -66,4 +67,5 @@ __all__ = [
     "SubProcess",
     "Switch",
     "SageMaker",
+    "Kubernetes",
 ]
diff --git a/src/pydolphinscheduler/tasks/kubernetes.py b/src/pydolphinscheduler/tasks/kubernetes.py
new file mode 100644
index 0000000..b902927
--- /dev/null
+++ b/src/pydolphinscheduler/tasks/kubernetes.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task Kubernetes."""
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task
+
+
+class Kubernetes(Task):
+    """Task Kubernetes object, declare behavior for Kubernetes task to dolphinscheduler.
+
+    :param name: task name
+    :param image: the registry url for image.
+    :param namespace: the namespace for running Kubernetes task.
+    :param min_cpu_cores: min CPU requirement for running Kubernetes task.
+    :param min_memory_space: min memory requirement for running Kubernetes task.
+    :param params_map: It is a local user-defined parameter for Kubernetes task.
+    """
+
+    _task_custom_attr = {
+        "image",
+        "namespace",
+        "min_cpu_cores",
+        "min_memory_space",
+    }
+
+    def __init__(
+        self,
+        name: str,
+        image: str,
+        namespace: str,
+        min_cpu_cores: float,
+        min_memory_space: float,
+        *args,
+        **kwargs
+    ):
+        super().__init__(name, TaskType.KUBERNETES, *args, **kwargs)
+        self.image = image
+        self.namespace = namespace
+        self.min_cpu_cores = min_cpu_cores
+        self.min_memory_space = min_memory_space
diff --git a/tests/tasks/test_kubernetes.py b/tests/tasks/test_kubernetes.py
new file mode 100644
index 0000000..d9b5f75
--- /dev/null
+++ b/tests/tasks/test_kubernetes.py
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task Kubernetes."""
+
+from unittest.mock import patch
+
+from pydolphinscheduler.tasks.kubernetes import Kubernetes
+
+
+def test_kubernetes_get_define():
+    """Test task kubernetes function get_define."""
+    code = 123
+    version = 1
+    name = "test_kubernetes_get_define"
+    image = "ds-dev"
+    namespace = str({"name": "default", "cluster": "lab"})
+    minCpuCores = 2.0
+    minMemorySpace = 10.0
+
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": "K8S",
+        "taskParams": {
+            "resourceList": [],
+            "localParams": [],
+            "image": image,
+            "namespace": namespace,
+            "minCpuCores": minCpuCores,
+            "minMemorySpace": minMemorySpace,
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "environmentCode": None,
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        k8s = Kubernetes(name, image, namespace, minCpuCores, minMemorySpace)
+        assert k8s.get_define() == expect