You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/12/02 02:12:27 UTC

[dolphinscheduler-sdk-python] branch main updated: [fix] Resource upload and add document (#36)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git


The following commit(s) were added to refs/heads/main by this push:
     new f9167c8  [fix] Resource upload and add document (#36)
f9167c8 is described below

commit f9167c8055c62cfe8572b67c1161546e889567b3
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Fri Dec 2 10:12:22 2022 +0800

    [fix] Resource upload and add document (#36)
    
    * Fix resource upload function
    * And some documentation about how to use resource uploading
---
 docs/source/concept.rst                            | 78 +++++++++++++++++++
 docs/source/howto/index.rst                        |  1 +
 docs/source/howto/multi-resources.rst              | 75 ++++++++++++++++++
 src/pydolphinscheduler/core/task.py                | 11 ++-
 src/pydolphinscheduler/core/workflow.py            | 10 ++-
 .../examples/multi_resources_example.py            | 88 ++++++++++++++++++++++
 src/pydolphinscheduler/models/tenant.py            |  2 +-
 tests/integration/test_resources.py                | 61 +++++++++++++++
 tests/testing/constants.py                         |  3 +
 9 files changed, 320 insertions(+), 9 deletions(-)

diff --git a/docs/source/concept.rst b/docs/source/concept.rst
index f5bc367..3b2f3d0 100644
--- a/docs/source/concept.rst
+++ b/docs/source/concept.rst
@@ -182,6 +182,84 @@ decide workflow of task. You could set `workflow` in both normal assign or in co
 
 With both `Workflow`_, `Tasks`_  and `Tasks Dependence`_, we could build a workflow with multiple tasks.
 
+Resource Files
+--------------
+
+During workflow running, we may need some resource files to help us run task usually. One of a common situation
+is that we already have some executable files locally, and we need to schedule in specific time, or add them
+to existing workflow by adding the new tasks. Of cause, we can upload those files to target machine and run them
+in :doc:`shell task <tasks/shell>` by reference the absolute path of file. But if we have more than one machine
+to run task, we have to upload those files to each of them. And it is not convenient and not flexible, because
+we may need to change our resource files sometimes.
+
+The more pydolphinscheduler way is to upload those files together with `workflow`_, and use them in task to run.
+For example, you have a bash script named ``echo-ten.sh`` locally, and it contains some code like this:
+
+.. code-block:: bash
+
+   #!/bin/env bash
+   max=10
+   for ((i=1; i <= $max; ++i)); do
+       echo "$i"
+   done
+
+and you want to use it in workflow but do not want to copy-paste it to shell task parameter ``command``. You could
+use this mechanism to upload it to resource center when you create workflow
+
+.. code-block:: python
+
+   # Read file content
+   file_name = "echo-ten.sh"
+   
+   with open(file_name, "r") as f:
+         content = f.read()
+
+   with Workflow(
+      name="upload_and_run",
+      resource_list=[
+         Resource(name=file_name, content=content),
+      ],
+   ) as workflow:
+
+And when we call :code:`workflow.run()` the new file named ``echo-ten.sh`` would be uploaded to dolphinscheduler
+resource center.
+
+After that we can use this file in our task by reference it by name, in this case we could use :doc:`shell task <tasks/shell>`
+to run it.
+
+.. code-block:: python
+
+   # We use `shell` task to run `echo-ten.sh` file
+   shell_task = Shell(
+      name="run",
+      command=f"bash {file_name}",
+      resource_list=[
+         file_name
+      ],
+   )
+
+During workflow running, the file would be downloaded to the task runtime working directory which mean you could
+execute them. We execute file by ``bash`` but reference it by name directly.
+
+Please notice that we could also reference the resource file already in dolphinscheduler resource center, which
+mean we could use resource center files in task by name without upload it again. And we can upload files to
+resource center bare without workflow. 
+
+.. code-block:: python
+
+   # Upload file to resource center
+   from pydolphinscheduler.core.resource import Resource
+
+   resource = Resource(name="bare-create.py", user_name="<USER-MUST-EXISTS-WITH-TENANT>", content="print('Bareh create resource')")
+   resource.create_or_update_resource()
+
+After that, we could see new file named ``bare-create.py`` is be created in resource center.
+
+.. note::
+
+   Both parameter ``resource_list`` in workflow and task is list of string which mean you could upload and reference
+   multiple files. For more complex usage, you could read :doc:`howto/multi-resources`.
+
 Authentication Token
 --------------------
 
diff --git a/docs/source/howto/index.rst b/docs/source/howto/index.rst
index a0b3c29..bcf6f44 100644
--- a/docs/source/howto/index.rst
+++ b/docs/source/howto/index.rst
@@ -28,3 +28,4 @@ Currently, the HOWTOs are:
    :maxdepth: 2
    
    remote-submit
+   multi-resources
diff --git a/docs/source/howto/multi-resources.rst b/docs/source/howto/multi-resources.rst
new file mode 100644
index 0000000..3b5e579
--- /dev/null
+++ b/docs/source/howto/multi-resources.rst
@@ -0,0 +1,75 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+Upload and Use Multiple Resources
+=================================
+
+Resource center help us manager resources in a centralized way, easy to change and distribute them to all the workers.
+for more detail you can see :ref:`resources files <concept:resource files>`.
+
+In this section we will show you how to upload and use multiple resources which is more common in production environment
+and in the real word.
+
+Overview
+--------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/multi_resources_example.py
+   :dedent: 0
+   :start-after: [start workflow]
+   :end-before: [end workflow]
+
+In this example, we will upload two python files to resource center and use them in one single task, the python
+files are named ``dependence.py`` and ``main.py``. And ``main.py`` use ``dependence.py`` as a dependency which
+will use a variable ``now`` declared in ``dependence.py``. So in task shell could call :code:`python main.py`
+to get all things done.
+
+Upload Resources
+----------------
+
+The module ``Resource`` need to be import firstly.
+
+.. code-block:: python
+
+    from pydolphinscheduler.core.resource import Resource
+
+Then we need to create two resources object and assign them to ``resource_list`` of the workflow. All content of
+resources should assign to ``content`` attribute of the resource object. Please know that we import variable
+:code:`now` from ``dependence.py`` in ``main.py``.
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/multi_resources_example.py
+   :dedent: 0
+   :start-after: [start create_new_resources]
+   :end-before: [end create_new_resources]
+
+Use Resources
+-------------
+
+Same as :ref:`using single resource <concept:resource files>`, all we need is to assign them to ``resource_list``
+attribute of the task and then call the main file to run our task. In this example, we call :code:`python main.py`
+which will use ``dependence.py`` as a dependency.
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/multi_resources_example.py
+   :dedent: 0
+   :start-after: [start use_exists_resources]
+   :end-before: [end use_exists_resources]
+
+After run the workflow, will execute main.py and print the current datetime. You can see the result like this:
+
+.. code-block:: text
+
+    2022-11-29 16:16:51.952742
+
diff --git a/src/pydolphinscheduler/core/task.py b/src/pydolphinscheduler/core/task.py
index cff2917..efab3e5 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -18,6 +18,7 @@
 """DolphinScheduler Task and TaskRelation object."""
 import copy
 import types
+import warnings
 from logging import getLogger
 from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
 
@@ -190,7 +191,7 @@ class Task(Base):
         self._workflow = workflow
 
     @property
-    def resource_list(self) -> List:
+    def resource_list(self) -> List[Dict[str, Resource]]:
         """Get task define attribute `resource_list`."""
         resources = set()
         for res in self._resource_list:
@@ -199,17 +200,19 @@ class Task(Base):
                     Resource(name=res, user_name=self.user_name).get_id_from_database()
                 )
             elif type(res) == dict and res.get(ResourceKey.ID) is not None:
-                logger.warning(
+                warnings.warn(
                     """`resource_list` should be defined using List[str] with resource paths,
                        the use of ids to define resources will be remove in version 3.2.0.
-                    """
+                    """,
+                    DeprecationWarning,
+                    stacklevel=2,
                 )
                 resources.add(res.get(ResourceKey.ID))
         return [{ResourceKey.ID: r} for r in resources]
 
     @property
     def user_name(self) -> Optional[str]:
-        """Return user name of workflow."""
+        """Return username of workflow."""
         if self.workflow:
             return self.workflow.user.name
         else:
diff --git a/src/pydolphinscheduler/core/workflow.py b/src/pydolphinscheduler/core/workflow.py
index db916d8..02906fb 100644
--- a/src/pydolphinscheduler/core/workflow.py
+++ b/src/pydolphinscheduler/core/workflow.py
@@ -419,6 +419,12 @@ class Workflow(Base):
         self._ensure_side_model_exists()
         self._pre_submit_check()
 
+        # resource should be created before workflow
+        if len(self.resource_list) > 0:
+            for res in self.resource_list:
+                res.user_name = self._user
+                res.create_or_update_resource()
+
         self._workflow_code = gateway.create_or_update_workflow(
             self._user,
             self._project,
@@ -438,10 +444,6 @@ class Workflow(Base):
             json.dumps(self.schedule_json) if self.schedule_json else None,
             None,
         )
-        if len(self.resource_list) > 0:
-            for res in self.resource_list:
-                res.user_name = self._user
-                res.create_or_update_resource()
         return self._workflow_code
 
     def start(self) -> None:
diff --git a/src/pydolphinscheduler/examples/multi_resources_example.py b/src/pydolphinscheduler/examples/multi_resources_example.py
new file mode 100644
index 0000000..dd06bd0
--- /dev/null
+++ b/src/pydolphinscheduler/examples/multi_resources_example.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+This example show how to upload file to dolphinscheduler resource center and use them in tasks.
+
+When you want to create a new resource file in resource center, you can add them to ``workflow.resource_list``
+use the following code:
+
+.. code-block:: python
+
+    with Workflow(
+        name="use_resource_center",
+        resource_list=[
+            Resource(name="new-name.py", content="print('hello world from resource center')"),
+        ],
+        ) as workflow:
+
+during the workflow running, the resource file will be created and uploaded to dolphinscheduler resource
+center automatically.
+
+If you want to use the resource file in tasks, you can also use ``resource_list`` parameter in task
+constructor, just like the following code:
+
+.. code-block:: python
+
+    task_use_resource = Shell(
+        name="run_resource",
+        command="python new-name.py",
+        resource_list=[
+            "new-name.py",
+        ],
+    )
+
+and the resource file will be downloaded to the task runtime working directory which mean you cna execute
+them. In this example we run the file ``new-name.py`` like we execute python script in terminal. And we can
+also use the resource file already in dolphinscheduler resource center, not only the new we created in
+current workflow.
+"""
+
+# [start workflow]
+from pydolphinscheduler.core import Workflow
+from pydolphinscheduler.core.resource import Resource
+from pydolphinscheduler.tasks import Shell
+
+dependence = "dependence.py"
+main = "main.py"
+
+with Workflow(
+    name="multi_resources_example",
+    tenant="tenant_exists",
+    # [start create_new_resources]
+    resource_list=[
+        Resource(
+            name=dependence,
+            content="from datetime import datetime\nnow = datetime.now()",
+        ),
+        Resource(name=main, content="from dependence import now\nprint(now)"),
+    ],
+    # [end create_new_resources]
+) as pd:
+    # [start use_exists_resources]
+    task_use_resource = Shell(
+        name="use-resource",
+        command=f"python {main}",
+        resource_list=[
+            dependence,
+            main,
+        ],
+    )
+    # [end use_exists_resources]
+
+    pd.run()
+# [end workflow]
diff --git a/src/pydolphinscheduler/models/tenant.py b/src/pydolphinscheduler/models/tenant.py
index 8b64707..146aec0 100644
--- a/src/pydolphinscheduler/models/tenant.py
+++ b/src/pydolphinscheduler/models/tenant.py
@@ -46,7 +46,7 @@ class Tenant(BaseSide):
         self, queue_name: str, user=configuration.USER_NAME
     ) -> None:
         """Create Tenant if not exists."""
-        tenant = gateway.create_tenant(self.name, self.description, queue_name)
+        tenant = gateway.create_tenant(self.name, queue_name, self.description)
         self.tenant_id = tenant.getId()
         self.code = tenant.getTenantCode()
         # gateway_result_checker(result, None)
diff --git a/tests/integration/test_resources.py b/tests/integration/test_resources.py
new file mode 100644
index 0000000..ee1d99d
--- /dev/null
+++ b/tests/integration/test_resources.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test pydolphinscheduler resources."""
+
+import pytest
+from py4j.java_gateway import JavaObject
+
+from pydolphinscheduler.core.resource import Resource
+from pydolphinscheduler.models.user import User
+from tests.testing.constants import UNIT_TEST_TENANT, UNIT_TEST_USER_NAME
+
+name = "unittest_resource.txt"
+content = "unittest_resource_content"
+
+
+@pytest.fixture(scope="module")
+def tmp_user():
+    """Get a temporary user."""
+    user = User(
+        name=UNIT_TEST_USER_NAME,
+        password="unittest-password",
+        email="test-email@abc.com",
+        phone="17366637777",
+        tenant=UNIT_TEST_TENANT,
+        queue="test-queue",
+        status=1,
+    )
+    user.create_if_not_exists()
+    yield
+    user.delete()
+
+
+def test_create_or_update(tmp_user):
+    """Test create or update resource to java gateway."""
+    resource = Resource(name=name, content=content, user_name=UNIT_TEST_USER_NAME)
+    result = resource.create_or_update_resource()
+    assert result is not None and isinstance(result, JavaObject)
+    assert result.getAlias() == name
+
+
+def test_get_resource_info(tmp_user):
+    """Test get resource info from java gateway."""
+    resource = Resource(name=name, user_name=UNIT_TEST_USER_NAME)
+    result = resource.get_info_from_database()
+    assert result is not None and isinstance(result, JavaObject)
+    assert result.getAlias() == name
diff --git a/tests/testing/constants.py b/tests/testing/constants.py
index 291a5a1..dbbf5e5 100644
--- a/tests/testing/constants.py
+++ b/tests/testing/constants.py
@@ -49,3 +49,6 @@ DEV_MODE = str(
 
 # default token
 TOKEN = "jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc"
+
+UNIT_TEST_USER_NAME = "unittest_user"
+UNIT_TEST_TENANT = "unittest_tenant"