You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/07/22 02:47:13 UTC

[dolphinscheduler] branch dev updated: [python] Support SageMaker task type (#11002)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new cc088e0f54 [python] Support SageMaker task type (#11002)
cc088e0f54 is described below

commit cc088e0f54741c17359b33491a5eff2d2205d4cc
Author: JieguangZhou <ji...@163.com>
AuthorDate: Fri Jul 22 10:47:08 2022 +0800

    [python] Support SageMaker task type (#11002)
---
 .../pydolphinscheduler/docs/source/tasks/index.rst |   2 +
 .../docs/source/tasks/{index.rst => sagemaker.rst} |  42 ++++-----
 .../src/pydolphinscheduler/constants.py            |   1 +
 .../examples/task_sagemaker_example.py             |  46 ++++++++++
 .../src/pydolphinscheduler/tasks/sagemaker.py      |  40 ++++++++
 .../tests/tasks/test_sagemaker.py                  | 101 +++++++++++++++++++++
 6 files changed, 207 insertions(+), 25 deletions(-)

diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
index d6bbb960c1..30173f838b 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
@@ -40,3 +40,5 @@ In this section
 
    datax
    sub_process
+
+   sagemaker
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sagemaker.rst
similarity index 69%
copy from dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
copy to dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sagemaker.rst
index d6bbb960c1..af627a929b 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sagemaker.rst
@@ -15,28 +15,20 @@
    specific language governing permissions and limitations
    under the License.
 
-Tasks
-=====
-
-In this section 
-
-.. toctree::
-   :maxdepth: 1
-   
-   func_wrap
-   shell
-   sql
-   python
-   http
-
-   switch
-   condition
-   dependent
-
-   spark
-   flink
-   map_reduce
-   procedure
-
-   datax
-   sub_process
+SageMaker
+=========
+
+
+A shell task type's example and dive into information of **PyDolphinScheduler**.
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_sagemaker_example.py
+   :start-after: [start workflow_declare]
+   :end-before: [end workflow_declare]
+
+Dive Into
+---------
+
+.. automodule:: pydolphinscheduler.tasks.sagemaker
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index a5089ac165..4544a6989d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -57,6 +57,7 @@ class TaskType(str):
     FLINK = "FLINK"
     SPARK = "SPARK"
     MR = "MR"
+    SAGEMAKER = "SAGEMAKER"
 
 
 class DefaultTaskCodeNum(str):
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_sagemaker_example.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_sagemaker_example.py
new file mode 100644
index 0000000000..b056f61a63
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_sagemaker_example.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [start workflow_declare]
+"""A example workflow for task sagemaker."""
+import json
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.sagemaker import SageMaker
+
+sagemaker_request_data = {
+    "ParallelismConfiguration": {"MaxParallelExecutionSteps": 1},
+    "PipelineExecutionDescription": "test Pipeline",
+    "PipelineExecutionDisplayName": "AbalonePipeline",
+    "PipelineName": "AbalonePipeline",
+    "PipelineParameters": [
+        {"Name": "ProcessingInstanceType", "Value": "ml.m4.xlarge"},
+        {"Name": "ProcessingInstanceCount", "Value": "2"},
+    ],
+}
+
+with ProcessDefinition(
+    name="task_sagemaker_example",
+    tenant="tenant_exists",
+) as pd:
+    task_sagemaker = SageMaker(
+        name="task_sagemaker",
+        sagemaker_request_json=json.dumps(sagemaker_request_data, indent=2),
+    )
+
+    pd.run()
+# [end workflow_declare]
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sagemaker.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sagemaker.py
new file mode 100644
index 0000000000..30b128d172
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sagemaker.py
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task SageMaker."""
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task
+
+
+class SageMaker(Task):
+    """Task SageMaker object, declare behavior for SageMaker task to dolphinscheduler.
+
+    :param name: A unique, meaningful string for the SageMaker task.
+    :param sagemaker_request_json: Request parameters of StartPipelineExecution,
+        see also `AWS API
+        <https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_StartPipelineExecution.html>`_
+
+    """
+
+    _task_custom_attr = {
+        "sagemaker_request_json",
+    }
+
+    def __init__(self, name: str, sagemaker_request_json: str, *args, **kwargs):
+        super().__init__(name, TaskType.SAGEMAKER, *args, **kwargs)
+        self.sagemaker_request_json = sagemaker_request_json
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py
new file mode 100644
index 0000000000..8838eaf497
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task SageMaker."""
+import json
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.tasks.sagemaker import SageMaker
+
+sagemaker_request_json = json.dumps(
+    {
+        "ParallelismConfiguration": {"MaxParallelExecutionSteps": 1},
+        "PipelineExecutionDescription": "test Pipeline",
+        "PipelineExecutionDisplayName": "AbalonePipeline",
+        "PipelineName": "AbalonePipeline",
+        "PipelineParameters": [
+            {"Name": "ProcessingInstanceType", "Value": "ml.m4.xlarge"},
+            {"Name": "ProcessingInstanceCount", "Value": "2"},
+        ],
+    },
+    indent=2,
+)
+
+
+@pytest.mark.parametrize(
+    "attr, expect",
+    [
+        (
+            {"sagemaker_request_json": sagemaker_request_json},
+            {
+                "sagemakerRequestJson": sagemaker_request_json,
+                "localParams": [],
+                "resourceList": [],
+                "dependence": {},
+                "waitStartTimeout": {},
+                "conditionResult": {"successNode": [""], "failedNode": [""]},
+            },
+        )
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+def test_property_task_params(mock_code_version, attr, expect):
+    """Test task sagemaker task property."""
+    task = SageMaker("test-sagemaker-task-params", **attr)
+    assert expect == task.task_params
+
+
+def test_sagemaker_get_define():
+    """Test task sagemaker function get_define."""
+    code = 123
+    version = 1
+    name = "test_sagemaker_get_define"
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": "SAGEMAKER",
+        "taskParams": {
+            "resourceList": [],
+            "localParams": [],
+            "sagemakerRequestJson": sagemaker_request_json,
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        sagemaker = SageMaker(name, sagemaker_request_json)
+        assert sagemaker.get_define() == expect