You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/12/02 02:12:27 UTC
[dolphinscheduler-sdk-python] branch main updated: [fix] Resource upload and add document (#36)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git
The following commit(s) were added to refs/heads/main by this push:
new f9167c8 [fix] Resource upload and add document (#36)
f9167c8 is described below
commit f9167c8055c62cfe8572b67c1161546e889567b3
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Fri Dec 2 10:12:22 2022 +0800
[fix] Resource upload and add document (#36)
* Fix resource upload function
* And some documentation about how to use resource uploading
---
docs/source/concept.rst | 78 +++++++++++++++++++
docs/source/howto/index.rst | 1 +
docs/source/howto/multi-resources.rst | 75 ++++++++++++++++++
src/pydolphinscheduler/core/task.py | 11 ++-
src/pydolphinscheduler/core/workflow.py | 10 ++-
.../examples/multi_resources_example.py | 88 ++++++++++++++++++++++
src/pydolphinscheduler/models/tenant.py | 2 +-
tests/integration/test_resources.py | 61 +++++++++++++++
tests/testing/constants.py | 3 +
9 files changed, 320 insertions(+), 9 deletions(-)
diff --git a/docs/source/concept.rst b/docs/source/concept.rst
index f5bc367..3b2f3d0 100644
--- a/docs/source/concept.rst
+++ b/docs/source/concept.rst
@@ -182,6 +182,84 @@ decide workflow of task. You could set `workflow` in both normal assign or in co
With both `Workflow`_, `Tasks`_ and `Tasks Dependence`_, we could build a workflow with multiple tasks.
+Resource Files
+--------------
+
+During workflow running, we may need some resource files to help us run task usually. One of a common situation
+is that we already have some executable files locally, and we need to schedule in specific time, or add them
+to existing workflow by adding the new tasks. Of cause, we can upload those files to target machine and run them
+in :doc:`shell task <tasks/shell>` by reference the absolute path of file. But if we have more than one machine
+to run task, we have to upload those files to each of them. And it is not convenient and not flexible, because
+we may need to change our resource files sometimes.
+
+The more pydolphinscheduler way is to upload those files together with `workflow`_, and use them in task to run.
+For example, you have a bash script named ``echo-ten.sh`` locally, and it contains some code like this:
+
+.. code-block:: bash
+
+ #!/bin/env bash
+ max=10
+ for ((i=1; i <= $max; ++i)); do
+ echo "$i"
+ done
+
+and you want to use it in workflow but do not want to copy-paste it to shell task parameter ``command``. You could
+use this mechanism to upload it to resource center when you create workflow
+
+.. code-block:: python
+
+ # Read file content
+ file_name = "echo-ten.sh"
+
+ with open(file_name, "r") as f:
+ content = f.read()
+
+ with Workflow(
+ name="upload_and_run",
+ resource_list=[
+ Resource(name=file_name, content=content),
+ ],
+ ) as workflow:
+
+And when we call :code:`workflow.run()` the new file named ``echo-ten.sh`` would be uploaded to dolphinscheduler
+resource center.
+
+After that we can use this file in our task by reference it by name, in this case we could use :doc:`shell task <tasks/shell>`
+to run it.
+
+.. code-block:: python
+
+ # We use `shell` task to run `echo-ten.sh` file
+ shell_task = Shell(
+ name="run",
+ command=f"bash {file_name}",
+ resource_list=[
+ file_name
+ ],
+ )
+
+During workflow running, the file would be downloaded to the task runtime working directory which mean you could
+execute them. We execute file by ``bash`` but reference it by name directly.
+
+Please notice that we could also reference the resource file already in dolphinscheduler resource center, which
+mean we could use resource center files in task by name without upload it again. And we can upload files to
+resource center bare without workflow.
+
+.. code-block:: python
+
+ # Upload file to resource center
+ from pydolphinscheduler.core.resource import Resource
+
+ resource = Resource(name="bare-create.py", user_name="<USER-MUST-EXISTS-WITH-TENANT>", content="print('Bareh create resource')")
+ resource.create_or_update_resource()
+
+After that, we could see new file named ``bare-create.py`` is be created in resource center.
+
+.. note::
+
+ Both parameter ``resource_list`` in workflow and task is list of string which mean you could upload and reference
+ multiple files. For more complex usage, you could read :doc:`howto/multi-resources`.
+
Authentication Token
--------------------
diff --git a/docs/source/howto/index.rst b/docs/source/howto/index.rst
index a0b3c29..bcf6f44 100644
--- a/docs/source/howto/index.rst
+++ b/docs/source/howto/index.rst
@@ -28,3 +28,4 @@ Currently, the HOWTOs are:
:maxdepth: 2
remote-submit
+ multi-resources
diff --git a/docs/source/howto/multi-resources.rst b/docs/source/howto/multi-resources.rst
new file mode 100644
index 0000000..3b5e579
--- /dev/null
+++ b/docs/source/howto/multi-resources.rst
@@ -0,0 +1,75 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+.. http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Upload and Use Multiple Resources
+=================================
+
+Resource center help us manager resources in a centralized way, easy to change and distribute them to all the workers.
+for more detail you can see :ref:`resources files <concept:resource files>`.
+
+In this section we will show you how to upload and use multiple resources which is more common in production environment
+and in the real word.
+
+Overview
+--------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/multi_resources_example.py
+ :dedent: 0
+ :start-after: [start workflow]
+ :end-before: [end workflow]
+
+In this example, we will upload two python files to resource center and use them in one single task, the python
+files are named ``dependence.py`` and ``main.py``. And ``main.py`` use ``dependence.py`` as a dependency which
+will use a variable ``now`` declared in ``dependence.py``. So in task shell could call :code:`python main.py`
+to get all things done.
+
+Upload Resources
+----------------
+
+The module ``Resource`` need to be import firstly.
+
+.. code-block:: python
+
+ from pydolphinscheduler.core.resource import Resource
+
+Then we need to create two resources object and assign them to ``resource_list`` of the workflow. All content of
+resources should assign to ``content`` attribute of the resource object. Please know that we import variable
+:code:`now` from ``dependence.py`` in ``main.py``.
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/multi_resources_example.py
+ :dedent: 0
+ :start-after: [start create_new_resources]
+ :end-before: [end create_new_resources]
+
+Use Resources
+-------------
+
+Same as :ref:`using single resource <concept:resource files>`, all we need is to assign them to ``resource_list``
+attribute of the task and then call the main file to run our task. In this example, we call :code:`python main.py`
+which will use ``dependence.py`` as a dependency.
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/multi_resources_example.py
+ :dedent: 0
+ :start-after: [start use_exists_resources]
+ :end-before: [end use_exists_resources]
+
+After run the workflow, will execute main.py and print the current datetime. You can see the result like this:
+
+.. code-block:: text
+
+ 2022-11-29 16:16:51.952742
+
diff --git a/src/pydolphinscheduler/core/task.py b/src/pydolphinscheduler/core/task.py
index cff2917..efab3e5 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -18,6 +18,7 @@
"""DolphinScheduler Task and TaskRelation object."""
import copy
import types
+import warnings
from logging import getLogger
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
@@ -190,7 +191,7 @@ class Task(Base):
self._workflow = workflow
@property
- def resource_list(self) -> List:
+ def resource_list(self) -> List[Dict[str, Resource]]:
"""Get task define attribute `resource_list`."""
resources = set()
for res in self._resource_list:
@@ -199,17 +200,19 @@ class Task(Base):
Resource(name=res, user_name=self.user_name).get_id_from_database()
)
elif type(res) == dict and res.get(ResourceKey.ID) is not None:
- logger.warning(
+ warnings.warn(
"""`resource_list` should be defined using List[str] with resource paths,
the use of ids to define resources will be remove in version 3.2.0.
- """
+ """,
+ DeprecationWarning,
+ stacklevel=2,
)
resources.add(res.get(ResourceKey.ID))
return [{ResourceKey.ID: r} for r in resources]
@property
def user_name(self) -> Optional[str]:
- """Return user name of workflow."""
+ """Return username of workflow."""
if self.workflow:
return self.workflow.user.name
else:
diff --git a/src/pydolphinscheduler/core/workflow.py b/src/pydolphinscheduler/core/workflow.py
index db916d8..02906fb 100644
--- a/src/pydolphinscheduler/core/workflow.py
+++ b/src/pydolphinscheduler/core/workflow.py
@@ -419,6 +419,12 @@ class Workflow(Base):
self._ensure_side_model_exists()
self._pre_submit_check()
+ # resource should be created before workflow
+ if len(self.resource_list) > 0:
+ for res in self.resource_list:
+ res.user_name = self._user
+ res.create_or_update_resource()
+
self._workflow_code = gateway.create_or_update_workflow(
self._user,
self._project,
@@ -438,10 +444,6 @@ class Workflow(Base):
json.dumps(self.schedule_json) if self.schedule_json else None,
None,
)
- if len(self.resource_list) > 0:
- for res in self.resource_list:
- res.user_name = self._user
- res.create_or_update_resource()
return self._workflow_code
def start(self) -> None:
diff --git a/src/pydolphinscheduler/examples/multi_resources_example.py b/src/pydolphinscheduler/examples/multi_resources_example.py
new file mode 100644
index 0000000..dd06bd0
--- /dev/null
+++ b/src/pydolphinscheduler/examples/multi_resources_example.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+This example show how to upload file to dolphinscheduler resource center and use them in tasks.
+
+When you want to create a new resource file in resource center, you can add them to ``workflow.resource_list``
+use the following code:
+
+.. code-block:: python
+
+ with Workflow(
+ name="use_resource_center",
+ resource_list=[
+ Resource(name="new-name.py", content="print('hello world from resource center')"),
+ ],
+ ) as workflow:
+
+during the workflow running, the resource file will be created and uploaded to dolphinscheduler resource
+center automatically.
+
+If you want to use the resource file in tasks, you can also use ``resource_list`` parameter in task
+constructor, just like the following code:
+
+.. code-block:: python
+
+ task_use_resource = Shell(
+ name="run_resource",
+ command="python new-name.py",
+ resource_list=[
+ "new-name.py",
+ ],
+ )
+
+and the resource file will be downloaded to the task runtime working directory which mean you cna execute
+them. In this example we run the file ``new-name.py`` like we execute python script in terminal. And we can
+also use the resource file already in dolphinscheduler resource center, not only the new we created in
+current workflow.
+"""
+
+# [start workflow]
+from pydolphinscheduler.core import Workflow
+from pydolphinscheduler.core.resource import Resource
+from pydolphinscheduler.tasks import Shell
+
+dependence = "dependence.py"
+main = "main.py"
+
+with Workflow(
+ name="multi_resources_example",
+ tenant="tenant_exists",
+ # [start create_new_resources]
+ resource_list=[
+ Resource(
+ name=dependence,
+ content="from datetime import datetime\nnow = datetime.now()",
+ ),
+ Resource(name=main, content="from dependence import now\nprint(now)"),
+ ],
+ # [end create_new_resources]
+) as pd:
+ # [start use_exists_resources]
+ task_use_resource = Shell(
+ name="use-resource",
+ command=f"python {main}",
+ resource_list=[
+ dependence,
+ main,
+ ],
+ )
+ # [end use_exists_resources]
+
+ pd.run()
+# [end workflow]
diff --git a/src/pydolphinscheduler/models/tenant.py b/src/pydolphinscheduler/models/tenant.py
index 8b64707..146aec0 100644
--- a/src/pydolphinscheduler/models/tenant.py
+++ b/src/pydolphinscheduler/models/tenant.py
@@ -46,7 +46,7 @@ class Tenant(BaseSide):
self, queue_name: str, user=configuration.USER_NAME
) -> None:
"""Create Tenant if not exists."""
- tenant = gateway.create_tenant(self.name, self.description, queue_name)
+ tenant = gateway.create_tenant(self.name, queue_name, self.description)
self.tenant_id = tenant.getId()
self.code = tenant.getTenantCode()
# gateway_result_checker(result, None)
diff --git a/tests/integration/test_resources.py b/tests/integration/test_resources.py
new file mode 100644
index 0000000..ee1d99d
--- /dev/null
+++ b/tests/integration/test_resources.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test pydolphinscheduler resources."""
+
+import pytest
+from py4j.java_gateway import JavaObject
+
+from pydolphinscheduler.core.resource import Resource
+from pydolphinscheduler.models.user import User
+from tests.testing.constants import UNIT_TEST_TENANT, UNIT_TEST_USER_NAME
+
+name = "unittest_resource.txt"
+content = "unittest_resource_content"
+
+
+@pytest.fixture(scope="module")
+def tmp_user():
+ """Get a temporary user."""
+ user = User(
+ name=UNIT_TEST_USER_NAME,
+ password="unittest-password",
+ email="test-email@abc.com",
+ phone="17366637777",
+ tenant=UNIT_TEST_TENANT,
+ queue="test-queue",
+ status=1,
+ )
+ user.create_if_not_exists()
+ yield
+ user.delete()
+
+
+def test_create_or_update(tmp_user):
+ """Test create or update resource to java gateway."""
+ resource = Resource(name=name, content=content, user_name=UNIT_TEST_USER_NAME)
+ result = resource.create_or_update_resource()
+ assert result is not None and isinstance(result, JavaObject)
+ assert result.getAlias() == name
+
+
+def test_get_resource_info(tmp_user):
+ """Test get resource info from java gateway."""
+ resource = Resource(name=name, user_name=UNIT_TEST_USER_NAME)
+ result = resource.get_info_from_database()
+ assert result is not None and isinstance(result, JavaObject)
+ assert result.getAlias() == name
diff --git a/tests/testing/constants.py b/tests/testing/constants.py
index 291a5a1..dbbf5e5 100644
--- a/tests/testing/constants.py
+++ b/tests/testing/constants.py
@@ -49,3 +49,6 @@ DEV_MODE = str(
# default token
TOKEN = "jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc"
+
+UNIT_TEST_USER_NAME = "unittest_user"
+UNIT_TEST_TENANT = "unittest_tenant"