You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/07/22 02:47:13 UTC
[dolphinscheduler] branch dev updated: [python] Support SageMaker task type (#11002)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new cc088e0f54 [python] Support SageMaker task type (#11002)
cc088e0f54 is described below
commit cc088e0f54741c17359b33491a5eff2d2205d4cc
Author: JieguangZhou <ji...@163.com>
AuthorDate: Fri Jul 22 10:47:08 2022 +0800
[python] Support SageMaker task type (#11002)
---
.../pydolphinscheduler/docs/source/tasks/index.rst | 2 +
.../docs/source/tasks/{index.rst => sagemaker.rst} | 42 ++++-----
.../src/pydolphinscheduler/constants.py | 1 +
.../examples/task_sagemaker_example.py | 46 ++++++++++
.../src/pydolphinscheduler/tasks/sagemaker.py | 40 ++++++++
.../tests/tasks/test_sagemaker.py | 101 +++++++++++++++++++++
6 files changed, 207 insertions(+), 25 deletions(-)
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
index d6bbb960c1..30173f838b 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
@@ -40,3 +40,5 @@ In this section
datax
sub_process
+
+ sagemaker
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sagemaker.rst
similarity index 69%
copy from dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
copy to dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sagemaker.rst
index d6bbb960c1..af627a929b 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sagemaker.rst
@@ -15,28 +15,20 @@
specific language governing permissions and limitations
under the License.
-Tasks
-=====
-
-In this section
-
-.. toctree::
- :maxdepth: 1
-
- func_wrap
- shell
- sql
- python
- http
-
- switch
- condition
- dependent
-
- spark
- flink
- map_reduce
- procedure
-
- datax
- sub_process
+SageMaker
+=========
+
+
+A shell task type's example and dive into information of **PyDolphinScheduler**.
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_sagemaker_example.py
+ :start-after: [start workflow_declare]
+ :end-before: [end workflow_declare]
+
+Dive Into
+---------
+
+.. automodule:: pydolphinscheduler.tasks.sagemaker
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index a5089ac165..4544a6989d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -57,6 +57,7 @@ class TaskType(str):
FLINK = "FLINK"
SPARK = "SPARK"
MR = "MR"
+ SAGEMAKER = "SAGEMAKER"
class DefaultTaskCodeNum(str):
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_sagemaker_example.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_sagemaker_example.py
new file mode 100644
index 0000000000..b056f61a63
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_sagemaker_example.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [start workflow_declare]
+"""A example workflow for task sagemaker."""
+import json
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.sagemaker import SageMaker
+
+sagemaker_request_data = {
+ "ParallelismConfiguration": {"MaxParallelExecutionSteps": 1},
+ "PipelineExecutionDescription": "test Pipeline",
+ "PipelineExecutionDisplayName": "AbalonePipeline",
+ "PipelineName": "AbalonePipeline",
+ "PipelineParameters": [
+ {"Name": "ProcessingInstanceType", "Value": "ml.m4.xlarge"},
+ {"Name": "ProcessingInstanceCount", "Value": "2"},
+ ],
+}
+
+with ProcessDefinition(
+ name="task_sagemaker_example",
+ tenant="tenant_exists",
+) as pd:
+ task_sagemaker = SageMaker(
+ name="task_sagemaker",
+ sagemaker_request_json=json.dumps(sagemaker_request_data, indent=2),
+ )
+
+ pd.run()
+# [end workflow_declare]
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sagemaker.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sagemaker.py
new file mode 100644
index 0000000000..30b128d172
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sagemaker.py
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task SageMaker."""
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task
+
+
+class SageMaker(Task):
+ """Task SageMaker object, declare behavior for SageMaker task to dolphinscheduler.
+
+ :param name: A unique, meaningful string for the SageMaker task.
+ :param sagemaker_request_json: Request parameters of StartPipelineExecution,
+ see also `AWS API
+ <https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_StartPipelineExecution.html>`_
+
+ """
+
+ _task_custom_attr = {
+ "sagemaker_request_json",
+ }
+
+ def __init__(self, name: str, sagemaker_request_json: str, *args, **kwargs):
+ super().__init__(name, TaskType.SAGEMAKER, *args, **kwargs)
+ self.sagemaker_request_json = sagemaker_request_json
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py
new file mode 100644
index 0000000000..8838eaf497
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task SageMaker."""
+import json
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.tasks.sagemaker import SageMaker
+
+sagemaker_request_json = json.dumps(
+ {
+ "ParallelismConfiguration": {"MaxParallelExecutionSteps": 1},
+ "PipelineExecutionDescription": "test Pipeline",
+ "PipelineExecutionDisplayName": "AbalonePipeline",
+ "PipelineName": "AbalonePipeline",
+ "PipelineParameters": [
+ {"Name": "ProcessingInstanceType", "Value": "ml.m4.xlarge"},
+ {"Name": "ProcessingInstanceCount", "Value": "2"},
+ ],
+ },
+ indent=2,
+)
+
+
+@pytest.mark.parametrize(
+ "attr, expect",
+ [
+ (
+ {"sagemaker_request_json": sagemaker_request_json},
+ {
+ "sagemakerRequestJson": sagemaker_request_json,
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "waitStartTimeout": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ },
+ )
+ ],
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+def test_property_task_params(mock_code_version, attr, expect):
+ """Test task sagemaker task property."""
+ task = SageMaker("test-sagemaker-task-params", **attr)
+ assert expect == task.task_params
+
+
+def test_sagemaker_get_define():
+ """Test task sagemaker function get_define."""
+ code = 123
+ version = 1
+ name = "test_sagemaker_get_define"
+ expect = {
+ "code": code,
+ "name": name,
+ "version": 1,
+ "description": None,
+ "delayTime": 0,
+ "taskType": "SAGEMAKER",
+ "taskParams": {
+ "resourceList": [],
+ "localParams": [],
+ "sagemakerRequestJson": sagemaker_request_json,
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
+ },
+ "flag": "YES",
+ "taskPriority": "MEDIUM",
+ "workerGroup": "default",
+ "failRetryTimes": 0,
+ "failRetryInterval": 1,
+ "timeoutFlag": "CLOSE",
+ "timeoutNotifyStrategy": None,
+ "timeout": 0,
+ }
+ with patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(code, version),
+ ):
+ sagemaker = SageMaker(name, sagemaker_request_json)
+ assert sagemaker.get_define() == expect