You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/11/09 03:38:29 UTC
[dolphinscheduler-sdk-python] branch main updated: [migrate] Add tag 3.0.0 source code (#7)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git
The following commit(s) were added to refs/heads/main by this push:
new 7ed3bf9 [migrate] Add tag 3.0.0 source code (#7)
7ed3bf9 is described below
commit 7ed3bf9c44e285bc901ea4d133c7bdccaa3e0116
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Wed Nov 9 11:38:25 2022 +0800
[migrate] Add tag 3.0.0 source code (#7)
Add version 3.0.0 code from https://github.com/apache/dolphinscheduler/tree/3.0.0/dolphinscheduler-python/pydolphinscheduler
---
.coveragerc | 6 +-
.flake8 | 8 +-
.pre-commit-config.yaml | 56 +++++
DEVELOP.md | 139 ++++++++++-
LICENSE | 228 +++++++++++++++++
NOTICE | 5 +
README.md | 8 +-
UPDATING.md | 37 +++
docs/source/{tasks/index.rst => cli.rst} | 43 ++--
docs/source/conf.py | 5 +
docs/source/config.rst | 218 +++++++++++++++++
docs/source/{tasks => howto}/index.rst | 29 +--
docs/source/howto/remote-submit.rst | 51 ++++
docs/source/index.rst | 3 +
docs/source/start.rst | 112 +++++++--
docs/source/tasks/{index.rst => func_wrap.rst} | 40 ++-
docs/source/tasks/index.rst | 1 +
docs/source/tutorial.rst | 233 ++++++++++++------
pytest.ini | 5 +-
setup.py | 25 +-
src/pydolphinscheduler/{core => cli}/__init__.py | 12 +-
src/pydolphinscheduler/cli/commands.py | 92 +++++++
src/pydolphinscheduler/constants.py | 33 +--
src/pydolphinscheduler/core/__init__.py | 2 +
src/pydolphinscheduler/core/base_side.py | 4 +-
src/pydolphinscheduler/core/configuration.py | 193 +++++++++++++++
src/pydolphinscheduler/core/default_config.yaml | 58 +++++
src/pydolphinscheduler/core/process_definition.py | 99 ++++++--
.../core/{base_side.py => resource.py} | 31 +--
src/pydolphinscheduler/core/task.py | 38 ++-
.../examples/task_dependent_example.py | 6 +-
.../examples/tutorial_decorator.py | 91 +++++++
src/pydolphinscheduler/exceptions.py | 12 +-
src/pydolphinscheduler/java_gateway.py | 7 +-
src/pydolphinscheduler/side/project.py | 8 +-
src/pydolphinscheduler/side/queue.py | 6 +-
src/pydolphinscheduler/side/tenant.py | 8 +-
src/pydolphinscheduler/side/user.py | 21 +-
src/pydolphinscheduler/tasks/func_wrap.py | 61 +++++
src/pydolphinscheduler/tasks/python.py | 71 +++++-
src/pydolphinscheduler/tasks/sql.py | 30 ++-
src/pydolphinscheduler/tasks/switch.py | 6 +-
src/pydolphinscheduler/utils/file.py | 57 +++++
src/pydolphinscheduler/utils/yaml_parser.py | 159 ++++++++++++
.../core => tests/cli}/__init__.py | 12 +-
tests/cli/test_config.py | 198 +++++++++++++++
tests/cli/test_version.py | 59 +++++
tests/core/test_configuration.py | 272 +++++++++++++++++++++
.../task.py => core/test_default_config_yaml.py} | 25 +-
tests/core/test_process_definition.py | 141 ++++++++---
.../task.py => core/test_resource_definition.py} | 30 ++-
tests/core/test_task.py | 60 ++++-
tests/example/test_example.py | 4 +-
.../core => tests/integration}/__init__.py | 12 +-
tests/integration/conftest.py | 46 ++++
tests/integration/test_java_gateway.py | 53 ++++
.../integration/test_process_definition.py | 38 ++-
tests/integration/test_submit_examples.py | 56 +++++
tests/tasks/test_func_wrap.py | 169 +++++++++++++
tests/tasks/test_python.py | 40 ++-
tests/tasks/test_sql.py | 50 ++--
tests/testing/cli.py | 87 +++++++
tests/testing/constants.py | 20 ++
.../core/__init__.py => tests/testing/decorator.py | 22 +-
tests/testing/docker_wrapper.py | 98 ++++++++
tests/testing/{task.py => file.py} | 22 +-
tests/testing/path.py | 13 +-
tests/testing/task.py | 15 ++
tests/utils/test_file.py | 85 +++++++
tests/utils/test_yaml_parser.py | 255 +++++++++++++++++++
tox.ini | 61 +++++
71 files changed, 3826 insertions(+), 474 deletions(-)
diff --git a/.coveragerc b/.coveragerc
index 524cb73..1620509 100644
--- a/.coveragerc
+++ b/.coveragerc
@@ -20,8 +20,10 @@ command_line = -m pytest
omit =
# Ignore all test cases in tests/
tests/*
+ # Ignore examples directory
+ */pydolphinscheduler/examples/*
# TODO. Temporary ignore java_gateway file, because we could not find good way to test it.
- src/pydolphinscheduler/java_gateway.py
+ */pydolphinscheduler/java_gateway.py
[report]
# Don’t report files that are 100% covered
@@ -29,4 +31,4 @@ skip_covered = True
show_missing = True
precision = 2
# Report will fail when coverage under 90.00%
-fail_under = 85
+fail_under = 90
diff --git a/.flake8 b/.flake8
index 7f659a2..120b42f 100644
--- a/.flake8
+++ b/.flake8
@@ -26,7 +26,9 @@ exclude =
old,
build,
dist,
- htmlcov
+ htmlcov,
+ .tox,
+ dist,
ignore =
# It's clear and not need to add docstring
D107, # D107: Don't require docstrings on __init__
@@ -34,5 +36,5 @@ ignore =
# Conflict to Black
W503 # W503: Line breaks before binary operators
per-file-ignores =
- src/pydolphinscheduler/side/__init__.py:F401
- src/pydolphinscheduler/tasks/__init__.py:F401
+ */pydolphinscheduler/side/__init__.py:F401
+ */pydolphinscheduler/tasks/__init__.py:F401
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
new file mode 100644
index 0000000..9e817a5
--- /dev/null
+++ b/.pre-commit-config.yaml
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# See https://pre-commit.com for more information
+# See https://pre-commit.com/hooks.html for more hooks
+
+default_stages: [commit, push]
+default_language_version:
+ # force all python hooks to run python3
+ python: python3
+repos:
+ - repo: https://github.com/pycqa/isort
+ rev: 5.10.1
+ hooks:
+ - id: isort
+ name: isort (python)
+ - repo: https://github.com/psf/black
+ rev: 22.1.0
+ hooks:
+ - id: black
+ - repo: https://github.com/pycqa/flake8
+ rev: 4.0.1
+ hooks:
+ - id: flake8
+ additional_dependencies: [
+ 'flake8-docstrings>=1.6',
+ 'flake8-black>=0.2',
+ ]
+ # pre-commit run in the root, so we have to point out the full path of configuration
+ args: [
+ --config,
+ dolphinscheduler-python/pydolphinscheduler/.flake8
+ ]
+ - repo: https://github.com/pycqa/autoflake
+ rev: v1.4
+ hooks:
+ - id: autoflake
+ args: [
+ --remove-all-unused-imports,
+ --ignore-init-module-imports,
+ --in-place
+ ]
diff --git a/DEVELOP.md b/DEVELOP.md
index f22ab86..bdd0416 100644
--- a/DEVELOP.md
+++ b/DEVELOP.md
@@ -34,7 +34,7 @@ Now, we should install all dependence to make sure we could run test or check co
```shell
cd dolphinscheduler/dolphinscheduler-python/pydolphinscheduler
-pip install .[dev]
+python -m pip install .[dev]
```
Next, we have to open pydolphinscheduler project in you editor. We recommend you use [pycharm][pycharm]
@@ -60,7 +60,57 @@ pydolphinscheduler tasks object, we use tasks to define exact job we want Dolphi
we only support `shell` task to execute shell task. [This link][all-task] list all tasks support in DolphinScheduler
and would be implemented in the further.
-## Code Style
+## Test Your Code
+
+Linting and tests is very important for open source project, so we pay more attention to it. We have continuous
+integration service run by GitHub Action to test whether the patch is good or not, which you could jump to
+section [With GitHub Action](#with-github-action) see more detail.
+
+And to make more convenience to local tests, we also have the way to run your [test automated with tox](#automated-testing-with-tox)
+locally(*run all tests except integrate test with need docker environment*). It is helpful when your try to find out the
+detail when continuous integration in GitHub Action failed, or you have a great patch and want to test local first.
+
+Besides [automated testing with tox](#automated-testing-with-tox) locally, we also have a [manual way](#manually)
+run tests. And it is scattered commands to reproduce each step of the integration test we told about.
+
+* Remote
+ * [With GitHub Action](#with-github-action)
+* Local
+ * [Automated Testing With tox](#automated-testing-with-tox)(including all but integrate test)
+ * [Manually](#manually)(with integrate test)
+
+### With GitHub Action
+
+GitHub Action test in various environment for pydolphinscheduler, including different python version in
+`3.6|3.7|3.8|3.9` and operating system `linux|macOS|windows`. It will trigger and run automatically when you
+submit pull requests to `apache/dolphinscheduler`.
+
+### Automated Testing With tox
+
+[tox](https://tox.wiki) is a package aims to automate and standardize testing in Python, both our continuous
+integration and local test use it to run actual task. To use it, you should install it first
+
+```shell
+python -m pip install --upgrade tox
+```
+
+After installation, you could run a single command to run all the tests, it is almost like test in GitHub Action
+but not so much different environment.
+
+```shell
+tox -e local-ci
+```
+
+It will take a while when you run it the first time, because it has to install dependencies and make some prepare,
+and the next time you run it will be faster.
+
+If you failed section `lint` when you run command `tox -e local-ci`, you could try to run command `tox -e auto-lint`
+which we provider fix as many lints as possible. When I finish, you could run command `tox -e local-ci` to see
+whether the linter pass or not, you have to fix it by yourself if linter still fail.
+
+### Manually
+
+#### Code Style
We use [isort][isort] to automatically keep Python imports alphabetically, and use [Black][black] for code
formatter and [Flake8][flake8] for pep8 checker. If you use [pycharm][pycharm]or [IntelliJ IDEA][idea],
@@ -69,18 +119,49 @@ maybe you could follow [Black-integration][black-editor] to configure them in yo
Our Python API CI would automatically run code style checker and unittest when you submit pull request in
GitHub, you could also run static check locally.
+We recommend [pre-commit](https://pre-commit.com/) to do the checker mentioned above before you develop locally.
+You should install `pre-commit` by running
+
+```shell
+python -m pip install pre-commit
+```
+
+in your development environment and then run `pre-commit install` to set up the git hooks scripts. After finish
+above steps, each time you run `git commit` or `git push` would run pre-commit check to make basic check before
+you create pull requests in GitHub.
+
```shell
# We recommend you run isort and Black before Flake8, because Black could auto fix some code style issue
# but Flake8 just hint when code style not match pep8
# Run Isort
-isort .
+python -m isort .
# Run Black
-black .
+python -m black .
# Run Flake8
-flake8
+python -m flake8
+```
+
+#### Testing
+
+## Build Docs
+
+We use [sphinx][sphinx] to build docs. Dolphinscheduler Python API CI would automatically build docs when you submit pull request in
+GitHub. You may locally ensure docs could be built suceessfully in case the failure blocks CI.
+
+To build docs locally, install sphinx and related python modules first via:
+
+```shell
+python -m pip install '.[doc]'
+```
+
+Then
+
+```shell
+cd pydolphinscheduler/docs/
+make clean && make html
```
## Testing
@@ -88,25 +169,58 @@ flake8
pydolphinscheduler using [pytest][pytest] to test our codebase. GitHub Action will run our test when you create
pull request or commit to dev branch, with python version `3.6|3.7|3.8|3.9` and operating system `linux|macOS|windows`.
-To test locally, you could directly run pytest after set `PYTHONPATH`
+pydolphinscheduler using [pytest][pytest] to run all tests in directory `tests`. You could run tests by the commands
```shell
-PYTHONPATH=src/ pytest
+python -m pytest --cov=pydolphinscheduler --cov-config=.coveragerc tests/
```
-We try to keep pydolphinscheduler usable through unit test coverage. 90% test coverage is our target, but for
-now, we require test coverage up to 85%, and each pull request leas than 85% would fail our CI step
-`Tests coverage`. We use [coverage][coverage] to check our test coverage, and you could check it locally by
-run command.
+Besides run tests, it will also check the unit test [coverage][coverage] threshold, for now when test cover less than 90%
+will fail the coverage, as well as our GitHub Action.
+
+The command above will check test coverage automatically, and you could also test the coverage by command.
```shell
-coverage run && coverage report
+python -m coverage run && python -m coverage report
```
It would not only run unit test but also show each file coverage which cover rate less than 100%, and `TOTAL`
line show you total coverage of you code. If your CI failed with coverage you could go and find some reason by
this command output.
+#### Integrate Test
+
+Integrate Test can not run when you execute command `tox -e local-ci` because it needs external environment
+including [Docker](https://docs.docker.com/get-docker/) and specific image build by [maven](https://maven.apache.org/install.html).
+Here we would show you the step to run integrate test in directory `dolphinscheduler-python/pydolphinscheduler/tests/integration`.
+
+```shell
+# Go to project root directory and build Docker image
+cd ../../
+
+# Build Docker image
+./mvnw -B clean install \
+ -Dmaven.test.skip \
+ -Dmaven.javadoc.skip \
+ -Dmaven.checkstyle.skip \
+ -Pdocker,release -Ddocker.tag=ci \
+ -pl dolphinscheduler-standalone-server -am
+
+# Go to pydolphinscheduler root directory and run integrate tests
+tox -e integrate-test
+```
+
+## Add LICENSE When New Dependencies Adding
+
+When you add a new package in pydolphinscheduler, you should also add the package's LICENSE to directory
+`dolphinscheduler-dist/release-docs/licenses/python-api-licenses`, and also add a short description to
+`dolphinscheduler-dist/release-docs/LICENSE`.
+
+## Update `UPDATING.md` when public class, method or interface is be changed
+
+When you change public class, method or interface, you should change the [UPDATING.md](./UPDATING.md) to notice
+users who may use it in other way.
+
<!-- content -->
[py4j]: https://www.py4j.org/index.html
[pycharm]: https://www.jetbrains.com/pycharm
@@ -118,3 +232,4 @@ this command output.
[black-editor]: https://black.readthedocs.io/en/stable/integrations/editors.html#pycharm-intellij-idea
[coverage]: https://coverage.readthedocs.io/en/stable/
[isort]: https://pycqa.github.io/isort/index.html
+[sphinx]: https://www.sphinx-doc.org/en/master/
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..a7359fa
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,228 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "{}"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright {yyyy} {name of copyright owner}
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+============================================================================
+Apache DolphinScheduler Python API SUBCOMPONENTS:
+
+The Apache DolphinScheduler Python API project contains subcomponents
+with separate copyright notices and license terms. Your use of the source
+code for the these subcomponents is subject to the terms and conditions
+of the following licenses.
+
+========================================================================
+BSD licenses
+========================================================================
+
+The following components are provided under a BSD license. See project link for details.
+The text of each license is also included at licenses/LICENSE-[project].txt.
+
+ py4j v0.10 (https://github.com/py4j/py4j)
+ click v8.0 (https://github.com/pallets/click)
+
+========================================================================
+MIT licenses
+========================================================================
+
+The following components are provided under the MIT License. See project link for details.
+The text of each license is also included at licenses/LICENSE-[project].txt.
+
+ ruamel.yaml v0.17 (https://sourceforge.net/projects/ruamel-yaml/)
diff --git a/NOTICE b/NOTICE
new file mode 100644
index 0000000..61acdab
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1,5 @@
+Apache DolphinScheduler
+Copyright 2017-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
diff --git a/README.md b/README.md
index 9cc524d..316d604 100644
--- a/README.md
+++ b/README.md
@@ -38,11 +38,11 @@ your workflow by python code, aka workflow-as-codes.
```shell
# Install
-$ pip install apache-dolphinscheduler
+python -m pip install apache-dolphinscheduler
-# Check installation, it is success if you see version output, here we use 0.1.0 as example
-$ python -c "import pydolphinscheduler; print(pydolphinscheduler.__version__)"
-0.1.0
+# Verify installation is successful, it will show the version of apache-dolphinscheduler, here we use 0.1.0 as example
+pydolphinscheduler version
+# 0.1.0
```
Here we show you how to install and run a simple example of pydolphinscheduler
diff --git a/UPDATING.md b/UPDATING.md
new file mode 100644
index 0000000..d772c6f
--- /dev/null
+++ b/UPDATING.md
@@ -0,0 +1,37 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# UPDATING
+
+Updating is try to document non-backward compatible updates which notice users the detail changes about pydolphinscheduler.
+It started after version 2.0.5 released
+
+## dev
+
+* Change variable about where to keep pydolphinscheduler configuration from ``PYDOLPHINSCHEDULER_HOME`` to
+ ``PYDS_HOME`` which is same as other environment variable name.
+
+## 3.0.0a0
+
+* Integrate Python gateway server into Dolphinscheduler API server, and you could start Python gateway service by command
+ `./bin/dolphinscheduler-daemon.sh start api-server` instead of independent command
+ `./bin/dolphinscheduler-daemon.sh start python-gateway-server`.
+* Remove parameter `queue` from class `ProcessDefinition` to avoid confuse user when it change but not work
+* Change `yaml_parser.py` method `to_string` to magic method `__str__` make it more pythonic.
+* Use package ``ruamel.yaml`` replace ``pyyaml`` for write yaml file with comment.
diff --git a/docs/source/tasks/index.rst b/docs/source/cli.rst
similarity index 63%
copy from docs/source/tasks/index.rst
copy to docs/source/cli.rst
index 42dcdf9..60e8231 100644
--- a/docs/source/tasks/index.rst
+++ b/docs/source/cli.rst
@@ -15,27 +15,22 @@
specific language governing permissions and limitations
under the License.
-Tasks
-=====
-
-In this section
-
-.. toctree::
- :maxdepth: 1
-
- shell
- sql
- python
- http
-
- switch
- condition
- dependent
-
- spark
- flink
- map_reduce
- procedure
-
- datax
- sub_process
+Command Line Interface
+======================
+
+*PyDolphinScheduler* have mechanism call CLI(command line interface) to help user control it in Shell.
+
+Prepare
+-------
+
+You have to :ref:`install PyDolphinScheduler <start:installing pydolphinscheduler>` first before you using
+its CLI
+
+Usage
+-----
+
+Here is basic usage about the command line of *PyDolphinScheduler*
+
+.. click:: pydolphinscheduler.cli.commands:cli
+ :prog: pydolphinscheduler
+ :nested: full
diff --git a/docs/source/conf.py b/docs/source/conf.py
index 5ee73a5..b162e0c 100644
--- a/docs/source/conf.py
+++ b/docs/source/conf.py
@@ -55,6 +55,11 @@ extensions = [
"sphinx.ext.viewcode",
"sphinx.ext.autosectionlabel",
"sphinx_rtd_theme",
+ # Documenting command line interface
+ "sphinx_click.ext",
+ # Add inline tabbed content
+ "sphinx_inline_tabs",
+ "sphinx_copybutton",
]
# Add any paths that contain templates here, relative to this directory.
diff --git a/docs/source/config.rst b/docs/source/config.rst
new file mode 100644
index 0000000..feb147a
--- /dev/null
+++ b/docs/source/config.rst
@@ -0,0 +1,218 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+.. http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Configuration
+=============
+
+pydolphinscheduler has a built-in module setting necessary configuration to start and run your workflow code.
+You could directly use them if you only want to run a quick start or for a simple job like POC. But if you
+want to deep use pydolphinscheduler and even use it in production. You should probably need to modify and
+change the built-in configuration.
+
+We have two ways to modify the configuration:
+
+- `Using Environment Variables`_: The more lightweight way to modify the configuration. it is useful in
+ containerization scenarios, like docker and k8s, or when you like to temporarily override configs in the
+ configuration file.
+- `Using Configuration File`_: The more general way to modify the configuration. It is useful when you want
+ to persist and manage configuration files in one single file.
+
+Using Environment Variables
+---------------------------
+
+You could change the configuration by adding or modifying the operating system's environment variables. No
+matter what way you used, as long as you can successfully modify the environment variables. We use two common
+ways, `Bash <by bash>`_ and `Python OS Module <by python os module>`_, as examples:
+
+By Bash
+^^^^^^^
+
+Setting environment variables via `Bash` is the most straightforward and easiest way. We give some examples about
+how to change them by Bash.
+
+.. code-block:: bash
+
+ # Modify Java Gateway Address
+ export PYDS_JAVA_GATEWAY_ADDRESS="192.168.1.1"
+
+ # Modify Workflow Default User
+ export PYDS_WORKFLOW_USER="custom-user"
+
+After executing the commands above, both ``PYDS_JAVA_GATEWAY_ADDRESS`` and ``PYDS_WORKFLOW_USER`` will be changed.
+The next time you execute and submit your workflow, it will submit to host `192.168.1.1`, and with workflow's user
+named `custom-user`.
+
+By Python OS Module
+^^^^^^^^^^^^^^^^^^^
+
+pydolphinscheduler is a Python API for Apache DolphinScheduler, and you could modify or add system environment
+variables via Python ``os`` module. In this example, we change variables as the same value as we change in
+`Bash <by bash>`_. It will take effect the next time you run your workflow, and call workflow ``run`` or ``submit``
+method next to ``os.environ`` statement.
+
+.. code-block:: python
+
+ import os
+ # Modify Java Gateway Address
+ os.environ["PYDS_JAVA_GATEWAY_ADDRESS"] = "192.168.1.1"
+
+ # Modify Workflow Default User
+ os.environ["PYDS_WORKFLOW_USER"] = "custom-user"
+
+All Configurations in Environment Variables
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+All environment variables as below, and you could modify their value via `Bash <by bash>`_ or `Python OS Module <by python os module>`_
+
++------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+| Variable Section | Variable Name | description |
++==================+====================================+====================================================================================================================+
+| | ``PYDS_JAVA_GATEWAY_ADDRESS`` | Default Java gateway address, will use its value when it is set. |
++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+| Java Gateway | ``PYDS_JAVA_GATEWAY_PORT`` | Default Java gateway port, will use its value when it is set. |
++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_JAVA_GATEWAY_AUTO_CONVERT`` | Default boolean Java gateway auto convert, will use its value when it is set. |
++------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_USER_NAME`` | Default user name, will use when user's ``name`` when does not specify. |
++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_USER_PASSWORD`` | Default user password, will use when user's ``password`` when does not specify. |
++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+| Default User | ``PYDS_USER_EMAIL`` | Default user email, will use when user's ``email`` when does not specify. |
++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_USER_PHONE`` | Default user phone, will use when user's ``phone`` when does not specify. |
++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_USER_STATE`` | Default user state, will use when user's ``state`` when does not specify. |
++------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_PROJECT`` | Default workflow project name, will use its value when workflow does not specify the attribute ``project``. |
++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_TENANT`` | Default workflow tenant, will use its value when workflow does not specify the attribute ``tenant``. |
++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+| Default Workflow | ``PYDS_WORKFLOW_USER`` | Default workflow user, will use its value when workflow does not specify the attribute ``user``. |
++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_QUEUE`` | Default workflow queue, will use its value when workflow does not specify the attribute ``queue``. |
++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_WORKER_GROUP`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``worker_group``. |
++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_RELEASE_STATE`` | Default workflow release state, will use its value when workflow does not specify the attribute ``release_state``. |
++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_TIME_ZONE`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``timezone``. |
++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_WARNING_TYPE`` | Default workflow warning type, will use its value when workflow does not specify the attribute ``warning_type``. |
++------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+
+.. note::
+
+ The scope of setting configuration via environment variable is in the workflow, and it will not change the
+ value of the configuration file. The :doc:`CLI <cli>` command ``config --get`` and ``config --set`` operate
+ the value of the configuration file, so the command ``config --get`` may return a different value from what
+ you set in the environment variable, and command ``config --get`` will never change your environment variable.
+
+Using Configuration File
+------------------------
+
+If you want to persist and manage configuration in a file instead of environment variables, or maybe you want
+want to save your configuration file to a version control system, like Git or SVN, and the way to change
+configuration by file is the best choice.
+
+Export Configuration File
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+pydolphinscheduler allows you to change the built-in configurations via CLI or editor you like. pydolphinscheduler
+integrated built-in configurations in its package, but you could also export it locally by CLI
+
+.. code-block:: bash
+
+ pydolphinscheduler config --init
+
+And it will create a new YAML file in the path `~/pydolphinscheduler/config.yaml` by default. If you want to export
+it to another path, you should set `PYDS_HOME` before you run command :code:`pydolphinscheduler config --init`.
+
+.. code-block:: bash
+
+ export PYDS_HOME=<CUSTOM_PATH>
+ pydolphinscheduler config --init
+
+After that, your configuration file will export into `<CUSTOM_PATH>/config.yaml` instead of the default path.
+
+Change Configuration
+^^^^^^^^^^^^^^^^^^^^
+
+In section `export configuration file`_ you export the configuration file locally, and as a local file, you could
+edit it with any editor you like. After you save your change in your editor, the latest configuration will work
+when you run your workflow code.
+
+You could also query or change the configuration via CLI :code:`config --get <config>` or :code:`config --get <config> <val>`.
+Both `--get` and `--set` could be called one or more times in single command, and you could only set the leaf
+node of the configuration but could get the parent configuration, there are simple examples below:
+
+.. code-block:: bash
+
+ # Get single configuration in the leaf node,
+ # The output look like below:
+ # java_gateway.address = 127.0.0.1
+ pydolphinscheduler config --get java_gateway.address
+
+ # Get multiple configuration in the leaf node,
+ # The output look like below:
+ # java_gateway.address = 127.0.0.1
+ # java_gateway.port = 25333
+ pydolphinscheduler config --get java_gateway.address --get java_gateway.port
+
+
+ # Get parent configuration which contain multiple leaf nodes,
+ # The output look like below:
+ # java_gateway = ordereddict([('address', '127.0.0.1'), ('port', 25333), ('auto_convert', True)])
+ pydolphinscheduler config --get java_gateway
+
+ # Set single configuration,
+ # The output look like below:
+ # Set configuration done.
+ pydolphinscheduler config --set java_gateway.address 192.168.1.1
+
+ # Set multiple configuration
+ # The output look like below:
+ # Set configuration done.
+ pydolphinscheduler config --set java_gateway.address 192.168.1.1 --set java_gateway.port 25334
+
+ # Set configuration not in leaf node will fail
+ # The output look like below:
+ # Raise error.
+ pydolphinscheduler config --set java_gateway 192.168.1.1,25334,True
+
+For more information about our CLI, you could see document :doc:`cli`.
+
+All Configurations in File
+^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Here are all our configurations for pydolphinscheduler.
+
+.. literalinclude:: ../../src/pydolphinscheduler/core/default_config.yaml
+ :language: yaml
+ :lines: 18-
+
+Priority
+--------
+
+We have two ways to modify the configuration and there is a built-in config in pydolphinscheduler too. It is
+very important to understand the priority of the configuration when you use them. The overview of configuration
+priority is.
+
+``Environment Variables > Configurations File > Built-in Configurations``
+
+This means that your setting in environment variables or configurations file will overwrite the built-in one.
+And you could temporarily modify configurations by setting environment variables without modifying the global
+config in the configuration file.
diff --git a/docs/source/tasks/index.rst b/docs/source/howto/index.rst
similarity index 71%
copy from docs/source/tasks/index.rst
copy to docs/source/howto/index.rst
index 42dcdf9..a0b3c29 100644
--- a/docs/source/tasks/index.rst
+++ b/docs/source/howto/index.rst
@@ -15,27 +15,16 @@
specific language governing permissions and limitations
under the License.
-Tasks
-=====
+HOWTOs
+======
-In this section
+pydolphinscheduler HOWTOs are documents that cover a single, specific topic, and attempt to cover it fairly
+completely. This collection is an effort to foster documentation that is more detailed than the :doc:`../concept`
+and :doc:`../tutorial`.
+
+Currently, the HOWTOs are:
.. toctree::
- :maxdepth: 1
+ :maxdepth: 2
- shell
- sql
- python
- http
-
- switch
- condition
- dependent
-
- spark
- flink
- map_reduce
- procedure
-
- datax
- sub_process
+ remote-submit
diff --git a/docs/source/howto/remote-submit.rst b/docs/source/howto/remote-submit.rst
new file mode 100644
index 0000000..b7efdf4
--- /dev/null
+++ b/docs/source/howto/remote-submit.rst
@@ -0,0 +1,51 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+.. http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Submit Your Code from Different machine
+=======================================
+
+Generally, we use pydolphinscheduler as a client to DolphinScheduler, and consider we may change our workflow
+code frequently, the best practice is running :ref:`python gateway service <start:start python gateway service>`
+in your server machine and submit the workflow code from your development machine, like a laptop or PC. This behavior
+is supported by pydolphinscheduler out of box with one or two single command lines.
+
+Export Configuration File
+-------------------------
+
+.. code-block:: bash
+
+ pydolphinscheduler config --init
+
+your could find more detail in :ref:`configuration exporting <config:export configuration file>`
+
+Run API Server in Other Host
+----------------------------
+
+.. code-block:: bash
+
+ pydolphinscheduler config --set java_gateway.address <your-api-server-ip-or-hostname>
+
+your could find more detail in :ref:`configuration setting <config:change configuration>`
+
+Run API Server in Other Port
+----------------------------
+
+.. code-block:: bash
+
+ pydolphinscheduler config --set java_gateway.port <your-python-gateway-service-port>
+
+your could find more detail in :ref:`configuration setting <config:change configuration>`
diff --git a/docs/source/index.rst b/docs/source/index.rst
index b04c26f..24ad107 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -32,6 +32,9 @@ then go and see :doc:`tutorial` for more detail.
tutorial
concept
tasks/index
+ howto/index
+ cli
+ config
api
Indices and tables
diff --git a/docs/source/start.rst b/docs/source/start.rst
index 0af90d5..270b5b8 100644
--- a/docs/source/start.rst
+++ b/docs/source/start.rst
@@ -41,8 +41,9 @@ without error(here is a example after Python 3.8.7 installed)
.. code-block:: bash
- $ python --version
- Python 3.8.7
+ python --version
+
+Will see detail of Python version, such as *Python 3.8.7*
Installing PyDolphinScheduler
-----------------------------
@@ -52,16 +53,24 @@ After Python is already installed on your machine following section
.. code-block:: bash
- $ pip install apache-dolphinscheduler
+ python -m pip install apache-dolphinscheduler
The latest version of *PyDolphinScheduler* would be installed after you run above
-command in your terminal. You could go and `start Python Gateway Server`_ to finish
+command in your terminal. You could go and `start Python Gateway Service`_ to finish
the prepare, and then go to :doc:`tutorial` to make your hand dirty. But if you
want to install the unreleased version of *PyDolphinScheduler*, you could go and see
-section `installing PyDolphinScheduler in dev`_ for more detail.
+section `installing PyDolphinScheduler in dev branch`_ for more detail.
+
+.. note::
-Installing PyDolphinScheduler In Dev
-------------------------------------
+ Currently, we released multiple pre-release package in PyPI, you can see all released package
+ including pre-release in `release history <https://pypi.org/project/apache-dolphinscheduler/#history>`_.
+ You can fix the the package version if you want to install pre-release package, for example if
+ you want to install version `3.0.0-beta-2` package, you can run command
+ :code:`python -m pip install apache-dolphinscheduler==3.0.0b2`.
+
+Installing PyDolphinScheduler In DEV Branch
+-------------------------------------------
Because the project is developing and some of the features still not release.
If you want to try some thing unreleased you could install from the source code
@@ -69,45 +78,94 @@ which we hold in GitHub
.. code-block:: bash
- # Clone Apache DolphinScheduler repository
- $ git clone git@github.com:apache/dolphinscheduler.git
- # Install PyDolphinScheduler in develop mode
- $ cd dolphinscheduler-python/pydolphinscheduler && pip install -e .
+ # Clone Apache DolphinScheduler repository
+ git clone git@github.com:apache/dolphinscheduler.git
+ # Install PyDolphinScheduler in develop mode
+ cd dolphinscheduler-python/pydolphinscheduler && python -m pip install -e .
-After you installed *PyDolphinScheduler*, please remember `start Python Gateway Server`_
+After you installed *PyDolphinScheduler*, please remember `start Python Gateway Service`_
which waiting for *PyDolphinScheduler*'s workflow definition require.
-Start Python Gateway Server
----------------------------
+Above command will clone whole dolphinscheduler source code to local, maybe you want to install latest pydolphinscheduler
+package directly and do not care about other code(including Python gateway service code), you can execute command
+
+.. code-block:: bash
+
+ # Must escape the '&' character by adding '\'
+ pip install -e "git+https://github.com/apache/dolphinscheduler.git#egg=apache-dolphinscheduler&subdirectory=dolphinscheduler-python/pydolphinscheduler"
+
+Start Python Gateway Service
+----------------------------
Since **PyDolphinScheduler** is Python API for `Apache DolphinScheduler`_, it
could define workflow and tasks structure, but could not run it unless you
-`install Apache DolphinScheduler`_ and start Python gateway server. We only
-and some key steps here and you could go `install Apache DolphinScheduler`_
-for more detail
+`install Apache DolphinScheduler`_ and start its API server which including
+Python gateway service in it. We only and some key steps here and you could
+go `install Apache DolphinScheduler`_ for more detail
.. code-block:: bash
- # Start pythonGatewayServer
- $ ./bin/dolphinscheduler-daemon.sh start pythonGatewayServer
+ # Start DolphinScheduler api-server which including python gateway service
+ ./bin/dolphinscheduler-daemon.sh start api-server
To check whether the server is alive or not, you could run :code:`jps`. And
-the server is health if keyword `PythonGatewayServer` in the console.
+the server is health if keyword `ApiApplicationServer` in the console.
+
+.. code-block:: bash
+
+ jps
+ # ....
+ # 201472 ApiApplicationServer
+ # ....
+
+.. note::
+
+ Please make sure you already enabled started Python gateway service along with `api-server`. The configuration is in
+ yaml config path `python-gateway.enabled : true` in api-server's configuration path in `api-server/conf/application.yaml`.
+ The default value is true and Python gateway service start when api server is been started.
+
+Run an Example
+--------------
+
+Before run an example for pydolphinscheduler, you should get the example code from it source code. You could run
+single bash command to get it
.. code-block:: bash
- $ jps
- ....
- 201472 PythonGatewayServer
- ....
+ wget https://raw.githubusercontent.com/apache/dolphinscheduler/dev/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial.py
+
+or you could copy-paste the content from `tutorial source code`_. And then you could run the example in your
+terminal
+
+.. code-block:: bash
+
+ python tutorial.py
+
+If you want to submit your workflow to a remote API server, which means that your workflow script is different
+from the API server, you should first change pydolphinscheduler configuration and then submit the workflow script
+
+.. code-block:: bash
+
+ pydolphinscheduler config --init
+ pydolphinscheduler config --set java_gateway.address <YOUR-API-SERVER-IP-OR-HOSTNAME>
+ python tutorial.py
+
+.. note::
+
+ You could see more information in :doc:`config` about all the configurations pydolphinscheduler supported.
+
+After that, you could go and see your DolphinScheduler web UI to find out a new workflow created by pydolphinscheduler,
+and the path of web UI is `Project -> Workflow -> Workflow Definition`.
+
What's More
-----------
-If you do not familiar with *PyDolphinScheduler*, you could go to :doc:`tutorial`
-and see how it work. But if you already know the inside of *PyDolphinScheduler*,
-maybe you could go and play with all :doc:`tasks/index` *PyDolphinScheduler* supports.
+If you do not familiar with *PyDolphinScheduler*, you could go to :doc:`tutorial` and see how it works. But
+if you already know the basic usage or concept of *PyDolphinScheduler*, you could go and play with all
+:doc:`tasks/index` *PyDolphinScheduler* supports, or see our :doc:`howto/index` about useful cases.
.. _`instructions for all platforms here`: https://wiki.python.org/moin/BeginnersGuide/Download
.. _`Apache DolphinScheduler`: https://dolphinscheduler.apache.org
.. _`install Apache DolphinScheduler`: https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/installation/standalone.html
+.. _`tutorial source code`: https://raw.githubusercontent.com/apache/dolphinscheduler/dev/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial.py
diff --git a/docs/source/tasks/index.rst b/docs/source/tasks/func_wrap.rst
similarity index 69%
copy from docs/source/tasks/index.rst
copy to docs/source/tasks/func_wrap.rst
index 42dcdf9..5f41b80 100644
--- a/docs/source/tasks/index.rst
+++ b/docs/source/tasks/func_wrap.rst
@@ -15,27 +15,19 @@
specific language governing permissions and limitations
under the License.
-Tasks
-=====
-
-In this section
-
-.. toctree::
- :maxdepth: 1
-
- shell
- sql
- python
- http
-
- switch
- condition
- dependent
-
- spark
- flink
- map_reduce
- procedure
-
- datax
- sub_process
+Python Function Wrapper
+=======================
+
+A decorator covert Python function into pydolphinscheduler's task.
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/tutorial_decorator.py
+ :start-after: [start tutorial]
+ :end-before: [end tutorial]
+
+Dive Into
+---------
+
+.. automodule:: pydolphinscheduler.tasks.func_wrap
\ No newline at end of file
diff --git a/docs/source/tasks/index.rst b/docs/source/tasks/index.rst
index 42dcdf9..d6bbb96 100644
--- a/docs/source/tasks/index.rst
+++ b/docs/source/tasks/index.rst
@@ -23,6 +23,7 @@ In this section
.. toctree::
:maxdepth: 1
+ func_wrap
shell
sql
python
diff --git a/docs/source/tutorial.rst b/docs/source/tutorial.rst
index 83c5746..6366c80 100644
--- a/docs/source/tutorial.rst
+++ b/docs/source/tutorial.rst
@@ -18,129 +18,202 @@
Tutorial
========
-This tutorial show you the basic concept of *PyDolphinScheduler* and tell all
+This tutorial shows you the basic concept of *PyDolphinScheduler* and tells all
things you should know before you submit or run your first workflow. If you
-still not install *PyDolphinScheduler* and start Apache DolphinScheduler, you
-could go and see :ref:`how to getting start PyDolphinScheduler <start:getting started>`
+still have not installed *PyDolphinScheduler* and start DolphinScheduler, you
+could go and see :ref:`how to getting start PyDolphinScheduler <start:getting started>` firstly.
Overview of Tutorial
--------------------
-Here have an overview of our tutorial, and it look a little complex but do not
-worry about that because we explain this example below as detailed as possible.
+Here have an overview of our tutorial, and it looks a little complex but does not
+worry about that because we explain this example below as detail as possible.
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
- :start-after: [start tutorial]
- :end-before: [end tutorial]
+There are two types of tutorials: traditional and task decorator.
+
+- **Traditional Way**: More general, support many :doc:`built-in task types <tasks/index>`, it is convenient
+ when you build your workflow at the beginning.
+- **Task Decorator**: A Python decorator allow you to wrap your function into pydolphinscheduler's task. Less
+ versatility to the traditional way because it only supported Python functions and without build-in tasks
+ supported. But it is helpful if your workflow is all built with Python or if you already have some Python
+ workflow code and want to migrate them to pydolphinscheduler.
+
+.. tab:: Tradition
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+ :dedent: 0
+ :start-after: [start tutorial]
+ :end-before: [end tutorial]
+
+.. tab:: Task Decorator
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+ :dedent: 0
+ :start-after: [start tutorial]
+ :end-before: [end tutorial]
Import Necessary Module
-----------------------
-First of all, we should importing necessary module which we would use later just
-like other Python package. We just create a minimum demo here, so we just import
-:class:`pydolphinscheduler.core.process_definition` and
-:class:`pydolphinscheduler.tasks.shell`.
+First of all, we should import the necessary module which we would use later just like other Python packages.
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
- :start-after: [start package_import]
- :end-before: [end package_import]
+.. tab:: Tradition
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+ :dedent: 0
+ :start-after: [start package_import]
+ :end-before: [end package_import]
-If you want to use other task type you could click and
-:doc:`see all tasks we support <tasks/index>`
+ In tradition tutorial we import :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` and
+ :class:`pydolphinscheduler.tasks.shell.Shell`.
+
+ If you want to use other task type you could click and :doc:`see all tasks we support <tasks/index>`
+
+.. tab:: Task Decorator
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+ :dedent: 0
+ :start-after: [start package_import]
+ :end-before: [end package_import]
+
+ In task decorator tutorial we import :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` and
+ :func:`pydolphinscheduler.tasks.func_wrap.task`.
Process Definition Declaration
------------------------------
-We should instantiate object after we import them from `import necessary module`_.
-Here we declare basic arguments for process definition(aka, workflow). We define
-the name of process definition, using `Python context manager`_ and it
-**the only required argument** for object process definition. Beside that we also
-declare three arguments named `schedule`, `start_time` which setting workflow schedule
-interval and schedule start_time, and argument `tenant` which changing workflow's
-task running user in the worker, :ref:`section tenant <concept:tenant>` in *PyDolphinScheduler*
-:doc:`concept` page have more detail information.
+We should instantiate :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` object after we
+import them from `import necessary module`_. Here we declare basic arguments for process definition(aka, workflow).
+We define the name of :code:`ProcessDefinition`, using `Python context manager`_ and it **the only required argument**
+for `ProcessDefinition`. Besides, we also declare three arguments named :code:`schedule` and :code:`start_time`
+which setting workflow schedule interval and schedule start_time, and argument :code:`tenant` defines which tenant
+will be running this task in the DolphinScheduler worker. See :ref:`section tenant <concept:tenant>` in
+*PyDolphinScheduler* :doc:`concept` for more information.
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
- :start-after: [start workflow_declare]
- :end-before: [end workflow_declare]
+.. tab:: Tradition
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+ :dedent: 0
+ :start-after: [start workflow_declare]
+ :end-before: [end workflow_declare]
+
+.. tab:: Task Decorator
-We could find more detail about process definition in
-:ref:`concept about process definition <concept:process definition>` if you interested in it.
-For all arguments of object process definition, you could find in the
-:class:`pydolphinscheduler.core.process_definition` api documentation.
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+ :dedent: 0
+ :start-after: [start workflow_declare]
+ :end-before: [end workflow_declare]
+
+We could find more detail about :code:`ProcessDefinition` in :ref:`concept about process definition <concept:process definition>`
+if you are interested in it. For all arguments of object process definition, you could find in the
+:class:`pydolphinscheduler.core.process_definition` API documentation.
Task Declaration
----------------
-Here we declare four tasks, and bot of them are simple task of
-:class:`pydolphinscheduler.tasks.shell` which running `echo` command in terminal.
-Beside the argument `command`, we also need setting argument `name` for each task *(not
-only shell task, `name` is required for each type of task)*.
+.. tab:: Tradition
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
- :dedent: 0
- :start-after: [start task_declare]
- :end-before: [end task_declare]
+ We declare four tasks to show how to create tasks, and both of them are simple tasks of
+ :class:`pydolphinscheduler.tasks.shell` which runs `echo` command in the terminal. Besides the argument
+ `command` with :code:`echo` command, we also need to set the argument `name` for each task
+ *(not only shell task, `name` is required for each type of task)*.
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+ :dedent: 0
+ :start-after: [start task_declare]
+ :end-before: [end task_declare]
+
+ Besides shell task, *PyDolphinScheduler* supports multiple tasks and you could find in :doc:`tasks/index`.
+
+.. tab:: Task Decorator
-Beside shell task, *PyDolphinScheduler* support multiple tasks and you could
-find in :doc:`tasks/index`.
+ We declare four tasks to show how to create tasks, and both of them are created by the task decorator which
+ using :func:`pydolphinscheduler.tasks.func_wrap.task`. All we have to do is add a decorator named
+ :code:`@task` to existing Python function, and then use them inside :class:`pydolphinscheduler.core.process_definition`
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+ :dedent: 0
+ :start-after: [start task_declare]
+ :end-before: [end task_declare]
+
+ It makes our workflow more Pythonic, but be careful that when we use task decorator mode mean we only use
+ Python function as a task and could not use the :doc:`built-in tasks <tasks/index>` most of the cases.
Setting Task Dependence
-----------------------
-After we declare both process definition and task, we have one workflow with
-four tasks, both all tasks is independent so that they would run in parallel.
-We should reorder the sort and the dependence of tasks. It useful when we need
-run prepare task before we run actual task or we need tasks running is specific
-rule. We both support attribute `set_downstream` and `set_upstream`, or bitwise
-operators `>>` and `<<`.
+After we declare both process definition and task, we have four tasks that are independent and will be running
+in parallel. If you want to start one task until some task is finished, you have to set dependence on those
+tasks.
-In this example, we set task `task_parent` is the upstream task of task
-`task_child_one` and `task_child_two`, and task `task_union` is the downstream
-task of both these two task.
+Set task dependence is quite easy by task's attribute :code:`set_downstream` and :code:`set_upstream` or by
+bitwise operators :code:`>>` and :code:`<<`
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
- :dedent: 0
- :start-after: [start task_relation_declare]
- :end-before: [end task_relation_declare]
+In this tutorial, task `task_parent` is the leading task of the whole workflow, then task `task_child_one` and
+task `task_child_two` are its downstream tasks. Task `task_union` will not run unless both task `task_child_one`
+and task `task_child_two` was done, because both two task is `task_union`'s upstream.
+
+.. tab:: Tradition
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+ :dedent: 0
+ :start-after: [start task_relation_declare]
+ :end-before: [end task_relation_declare]
-Please notice that we could grouping some tasks and set dependence if they have
-same downstream or upstream. We declare task `task_child_one` and `task_child_two`
-as a group here, named as `task_group` and set task `task_parent` as upstream of
-both of them. You could see more detail in :ref:`concept:Tasks Dependence` section in concept
-documentation.
+.. tab:: Task Decorator
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+ :dedent: 0
+ :start-after: [start task_relation_declare]
+ :end-before: [end task_relation_declare]
+
+.. note::
+
+ We could set task dependence in batch mode if they have the same downstream or upstream by declaring those
+ tasks as task groups. In tutorial, We declare task `task_child_one` and `task_child_two` as task group named
+ `task_group`, then set `task_group` as downstream of task `task_parent`. You could see more detail in
+ :ref:`concept:Tasks Dependence` for more detail about how to set task dependence.
Submit Or Run Workflow
----------------------
-Now we finish our workflow definition, with task and task dependence, but all
-these things are in local, we should let Apache DolphinScheduler daemon know what we
-define our workflow. So the last thing we have to do here is submit our workflow to
-Apache DolphinScheduler daemon.
+After that, we finish our workflow definition, with four tasks and task dependence, but all these things are
+local, we should let the DolphinScheduler daemon know how the definition of workflow. So the last thing we
+have to do is submit the workflow to the DolphinScheduler daemon.
-We here in the example using `ProcessDefinition` attribute `run` to submit workflow
-to the daemon, and set the schedule time we just declare in `process definition declaration`_.
+Fortunately, we have a convenient method to submit workflow via `ProcessDefinition` attribute :code:`run` which
+will create workflow definition as well as workflow schedule.
-Now, we could run the Python code like other Python script, for the basic usage run
-:code:`python tutorial.py` to trigger and run it.
+.. tab:: Tradition
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+ :dedent: 0
+ :start-after: [start submit_or_run]
+ :end-before: [end submit_or_run]
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
- :dedent: 0
- :start-after: [start submit_or_run]
- :end-before: [end submit_or_run]
+.. tab:: Task Decorator
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+ :dedent: 0
+ :start-after: [start submit_or_run]
+ :end-before: [end submit_or_run]
+
+At last, we could execute this workflow code in your terminal like other Python scripts, running
+:code:`python tutorial.py` to trigger and execute it.
+
+.. note::
-If you not start your Apache DolphinScheduler server, you could find the way in
-:ref:`start:start Python gateway server` and it would have more detail about related server
-start. Beside attribute `run`, we have attribute `submit` for object `ProcessDefinition`
-and it just submit workflow to the daemon but not setting the schedule information. For
-more detail you could see :ref:`concept:process definition`.
+ If you do not start your DolphinScheduler API server, you could find how to start it in
+ :ref:`start:start Python gateway service` for more detail. Besides attribute :code:`run`, we have attribute
+ :code:`submit` for object `ProcessDefinition` which just submits workflow to the daemon but does not set
+ the workflow schedule information. For more detail, you could see :ref:`concept:process definition`.
DAG Graph After Tutorial Run
----------------------------
-After we run the tutorial code, you could login Apache DolphinScheduler web UI,
-go and see the `DolphinScheduler project page`_. they is a new process definition be
-created and named "Tutorial". It create by *PyDolphinScheduler* and the DAG graph as below
+After we run the tutorial code, you could log in DolphinScheduler web UI, go and see the
+`DolphinScheduler project page`_. They is a new process definition be created by *PyDolphinScheduler* and it
+named "tutorial" or "tutorial_decorator". The task graph of workflow like below:
.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
:language: text
diff --git a/pytest.ini b/pytest.ini
index f2c7ae6..b1aa850 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -14,9 +14,8 @@
# limitations under the License.
[pytest]
-# Do not test test_java_gateway.py due to we can not mock java gateway for now
-addopts = --ignore=tests/test_java_gateway.py
-
# add path here to skip pytest scan it
norecursedirs =
tests/testing
+ # Integration test run seperated which do not calculate coverage, it will run in `tox -e integrate-test`
+ tests/integration
diff --git a/setup.py b/setup.py
index 81e1f4f..b1e3742 100644
--- a/setup.py
+++ b/setup.py
@@ -32,11 +32,13 @@ if sys.version_info[0] < 3:
logger = logging.getLogger(__name__)
-version = "2.0.7"
+version = "3.0.0"
# Start package required
prod = [
+ "click>=8.0.0",
"py4j~=0.10",
+ "ruamel.yaml",
]
build = [
@@ -48,12 +50,17 @@ build = [
doc = [
"sphinx>=4.3",
"sphinx_rtd_theme>=1.0",
+ "sphinx-click>=3.0",
+ "sphinx-inline-tabs",
+ "sphinx-copybutton>=0.4.0",
]
test = [
"pytest>=6.2",
"freezegun>=1.1",
"coverage>=6.1",
+ "pytest-cov>=3.0",
+ "docker>=5.0.3",
]
style = [
@@ -61,6 +68,7 @@ style = [
"flake8-docstrings>=1.6",
"flake8-black>=0.2",
"isort>=5.10",
+ "autoflake>=1.4",
]
dev = style + test + doc + build
@@ -91,11 +99,9 @@ class CleanCommand(Command):
def initialize_options(self) -> None:
"""Set default values for options."""
- pass
def finalize_options(self) -> None:
"""Set final values for options."""
- pass
def run(self) -> None:
"""Run and remove temporary files."""
@@ -130,8 +136,10 @@ setup(
project_urls={
"Homepage": "https://dolphinscheduler.apache.org",
"Documentation": "https://dolphinscheduler.apache.org/python/index.html",
- "Source": "https://github.com/apache/dolphinscheduler/dolphinscheduler-python/pydolphinscheduler",
- "Issue Tracker": "https://github.com/apache/dolphinscheduler/issues",
+ "Source": "https://github.com/apache/dolphinscheduler/tree/dev/dolphinscheduler-python/"
+ "pydolphinscheduler",
+ "Issue Tracker": "https://github.com/apache/dolphinscheduler/issues?"
+ "q=is%3Aissue+is%3Aopen+label%3APython",
"Discussion": "https://github.com/apache/dolphinscheduler/discussions",
"Twitter": "https://twitter.com/dolphinschedule",
},
@@ -139,7 +147,7 @@ setup(
package_dir={"": "src"},
include_package_data=True,
package_data={
- "examples": ["examples.tutorial.py"],
+ "pydolphinscheduler": ["core/default_config.yaml"],
},
platforms=["any"],
classifiers=[
@@ -173,4 +181,9 @@ setup(
cmdclass={
"pre_clean": CleanCommand,
},
+ entry_points={
+ "console_scripts": [
+ "pydolphinscheduler = pydolphinscheduler.cli.commands:cli",
+ ],
+ },
)
diff --git a/src/pydolphinscheduler/core/__init__.py b/src/pydolphinscheduler/cli/__init__.py
similarity index 73%
copy from src/pydolphinscheduler/core/__init__.py
copy to src/pydolphinscheduler/cli/__init__.py
index 31dc944..5f30c83 100644
--- a/src/pydolphinscheduler/core/__init__.py
+++ b/src/pydolphinscheduler/cli/__init__.py
@@ -15,14 +15,4 @@
# specific language governing permissions and limitations
# under the License.
-"""Init pydolphinscheduler.core package."""
-
-from pydolphinscheduler.core.database import Database
-from pydolphinscheduler.core.process_definition import ProcessDefinition
-from pydolphinscheduler.core.task import Task
-
-__all__ = [
- "ProcessDefinition",
- "Task",
- "Database",
-]
+"""Commands line interface of pydolphinscheduler."""
diff --git a/src/pydolphinscheduler/cli/commands.py b/src/pydolphinscheduler/cli/commands.py
new file mode 100644
index 0000000..e2ca86b
--- /dev/null
+++ b/src/pydolphinscheduler/cli/commands.py
@@ -0,0 +1,92 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Commands line interface's command of pydolphinscheduler."""
+
+import click
+from click import echo
+
+from pydolphinscheduler import __version__
+from pydolphinscheduler.core.configuration import (
+ get_single_config,
+ init_config_file,
+ set_single_config,
+)
+
+version_option_val = ["major", "minor", "micro"]
+
+
+@click.group()
+def cli():
+ """Apache DolphinScheduler Python API's command line interface."""
+
+
+@cli.command()
+@click.option(
+ "--part",
+ "-p",
+ required=False,
+ type=click.Choice(version_option_val, case_sensitive=False),
+ multiple=False,
+ help="The part of version your want to get.",
+)
+def version(part: str) -> None:
+ """Show current version of pydolphinscheduler."""
+ if part:
+ idx = version_option_val.index(part)
+ echo(f"{__version__.split('.')[idx]}")
+ else:
+ echo(f"{__version__}")
+
+
+@cli.command()
+@click.option(
+ "--init",
+ "-i",
+ is_flag=True,
+ help="Initialize and create configuration file to `PYDS_HOME`.",
+)
+@click.option(
+ "--set",
+ "-s",
+ "setter",
+ multiple=True,
+ type=click.Tuple([str, str]),
+ help="Set specific setting to config file."
+ "Use multiple ``--set <KEY> <VAL>`` options to set multiple configs",
+)
+@click.option(
+ "--get",
+ "-g",
+ "getter",
+ multiple=True,
+ type=str,
+ help="Get specific setting from config file."
+ "Use multiple ``--get <KEY>`` options to get multiple configs",
+)
+def config(getter, setter, init) -> None:
+ """Manage the configuration for pydolphinscheduler."""
+ if init:
+ init_config_file()
+ elif getter:
+ click.echo("The configuration query as below:\n")
+ configs_kv = [f"{key} = {get_single_config(key)}" for key in getter]
+ click.echo("\n".join(configs_kv))
+ elif setter:
+ for key, val in setter:
+ set_single_config(key, val)
+ click.echo("Set configuration done.")
diff --git a/src/pydolphinscheduler/constants.py b/src/pydolphinscheduler/constants.py
index 65bf6c5..a5089ac 100644
--- a/src/pydolphinscheduler/constants.py
+++ b/src/pydolphinscheduler/constants.py
@@ -18,29 +18,6 @@
"""Constants for pydolphinscheduler."""
-class ProcessDefinitionReleaseState:
- """Constants for :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` release state."""
-
- ONLINE: str = "ONLINE"
- OFFLINE: str = "OFFLINE"
-
-
-class ProcessDefinitionDefault:
- """Constants default value for :class:`pydolphinscheduler.core.process_definition.ProcessDefinition`."""
-
- PROJECT: str = "project-pydolphin"
- TENANT: str = "tenant_pydolphin"
- USER: str = "userPythonGateway"
- # TODO simple set password same as username
- USER_PWD: str = "userPythonGateway"
- USER_EMAIL: str = "userPythonGateway@dolphinscheduler.com"
- USER_PHONE: str = "11111111111"
- USER_STATE: int = 1
- QUEUE: str = "queuePythonGateway"
- WORKER_GROUP: str = "default"
- TIME_ZONE: str = "Asia/Shanghai"
-
-
class TaskPriority(str):
"""Constants for task priority."""
@@ -99,10 +76,6 @@ class JavaGatewayDefault(str):
RESULT_DATA = "data"
- SERVER_ADDRESS = "127.0.0.1"
- SERVER_PORT = 25333
- AUTO_CONVERT = True
-
class Delimiter(str):
"""Constants for delimiter."""
@@ -127,3 +100,9 @@ class Time(str):
FMT_STD_TIME = "%H:%M:%S"
FMT_NO_COLON_TIME = "%H%M%S"
+
+
+class ResourceKey(str):
+ """Constants for key of resource."""
+
+ ID = "id"
diff --git a/src/pydolphinscheduler/core/__init__.py b/src/pydolphinscheduler/core/__init__.py
index 31dc944..7497d1f 100644
--- a/src/pydolphinscheduler/core/__init__.py
+++ b/src/pydolphinscheduler/core/__init__.py
@@ -18,10 +18,12 @@
"""Init pydolphinscheduler.core package."""
from pydolphinscheduler.core.database import Database
+from pydolphinscheduler.core.engine import Engine
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.core.task import Task
__all__ = [
+ "Engine",
"ProcessDefinition",
"Task",
"Database",
diff --git a/src/pydolphinscheduler/core/base_side.py b/src/pydolphinscheduler/core/base_side.py
index ed20d70..dca1f12 100644
--- a/src/pydolphinscheduler/core/base_side.py
+++ b/src/pydolphinscheduler/core/base_side.py
@@ -19,7 +19,7 @@
from typing import Optional
-from pydolphinscheduler.constants import ProcessDefinitionDefault
+from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.base import Base
@@ -34,7 +34,7 @@ class BaseSide(Base):
cls,
# TODO comment for avoiding cycle import
# user: Optional[User] = ProcessDefinitionDefault.USER
- user=ProcessDefinitionDefault.USER,
+ user=configuration.WORKFLOW_USER,
):
"""Create Base if not exists."""
raise NotImplementedError
diff --git a/src/pydolphinscheduler/core/configuration.py b/src/pydolphinscheduler/core/configuration.py
new file mode 100644
index 0000000..860f986
--- /dev/null
+++ b/src/pydolphinscheduler/core/configuration.py
@@ -0,0 +1,193 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Configuration module for pydolphinscheduler."""
+import os
+from pathlib import Path
+from typing import Any
+
+from pydolphinscheduler.exceptions import PyDSConfException
+from pydolphinscheduler.utils import file
+from pydolphinscheduler.utils.yaml_parser import YamlParser
+
+BUILD_IN_CONFIG_PATH = Path(__file__).resolve().parent.joinpath("default_config.yaml")
+
+
+def config_path() -> Path:
+ """Get the path of pydolphinscheduler configuration file."""
+ pyds_home = os.environ.get("PYDS_HOME", "~/pydolphinscheduler")
+ config_file_path = Path(pyds_home).joinpath("config.yaml").expanduser()
+ return config_file_path
+
+
+def get_configs() -> YamlParser:
+ """Get all configuration settings from configuration file.
+
+ Will use custom configuration file first if it exists, otherwise default configuration file in
+ default path.
+ """
+ path = str(config_path()) if config_path().exists() else BUILD_IN_CONFIG_PATH
+ with open(path, mode="r") as f:
+ return YamlParser(f.read())
+
+
+def init_config_file() -> None:
+ """Initialize configuration file by default configs."""
+ if config_path().exists():
+ raise PyDSConfException(
+ "Initialize configuration false to avoid overwrite configure by accident, file already exists "
+ "in %s, if you wan to overwrite the exists configure please remove the exists file manually.",
+ str(config_path()),
+ )
+ file.write(content=str(get_configs()), to_path=str(config_path()))
+
+
+def get_single_config(key: str) -> Any:
+ """Get single config to configuration file.
+
+ Support get from nested keys by delimiter ``.``.
+
+ For example, yaml config as below:
+
+ .. code-block:: yaml
+
+ one:
+ two1:
+ three: value1
+ two2: value2
+
+ you could get ``value1`` and ``value2`` by nested path
+
+ .. code-block:: python
+
+ value1 = get_single_config("one.two1.three")
+ value2 = get_single_config("one.two2")
+
+ :param key: The config key want to get it value.
+ """
+ config = get_configs()
+ if key not in config:
+ raise PyDSConfException(
+ "Configuration path %s do not exists. Can not get configuration.", key
+ )
+ return config[key]
+
+
+def set_single_config(key: str, value: Any) -> None:
+ """Change single config to configuration file.
+
+ For example, yaml config as below:
+
+ .. code-block:: yaml
+
+ one:
+ two1:
+ three: value1
+ two2: value2
+
+ you could change ``value1`` to ``value3``, also change ``value2`` to ``value4`` by nested path assigned
+
+ .. code-block:: python
+
+ set_single_config["one.two1.three"] = "value3"
+ set_single_config["one.two2"] = "value4"
+
+ :param key: The config key want change.
+ :param value: The new value want to set.
+ """
+ config = get_configs()
+ if key not in config:
+ raise PyDSConfException(
+ "Configuration path %s do not exists. Can not set configuration.", key
+ )
+ config[key] = value
+ file.write(content=str(config), to_path=str(config_path()), overwrite=True)
+
+
+def get_int(val: Any) -> int:
+ """Covert value to int."""
+ return int(val)
+
+
+def get_bool(val: Any) -> bool:
+ """Covert value to boolean."""
+ if isinstance(val, str):
+ return val.lower() in {"true", "t"}
+ elif isinstance(val, int):
+ return val == 1
+ else:
+ return bool(val)
+
+
+# Start Common Configuration Settings
+
+# Add configs as module variables to avoid read configuration multiple times when
+# Get common configuration setting
+# Set or get multiple configs in single time
+configs: YamlParser = get_configs()
+
+# Java Gateway Settings
+JAVA_GATEWAY_ADDRESS = os.environ.get(
+ "PYDS_JAVA_GATEWAY_ADDRESS", configs.get("java_gateway.address")
+)
+JAVA_GATEWAY_PORT = get_int(
+ os.environ.get("PYDS_JAVA_GATEWAY_PORT", configs.get("java_gateway.port"))
+)
+JAVA_GATEWAY_AUTO_CONVERT = get_bool(
+ os.environ.get(
+ "PYDS_JAVA_GATEWAY_AUTO_CONVERT", configs.get("java_gateway.auto_convert")
+ )
+)
+
+# User Settings
+USER_NAME = os.environ.get("PYDS_USER_NAME", configs.get("default.user.name"))
+USER_PASSWORD = os.environ.get(
+ "PYDS_USER_PASSWORD", configs.get("default.user.password")
+)
+USER_EMAIL = os.environ.get("PYDS_USER_EMAIL", configs.get("default.user.email"))
+USER_PHONE = str(os.environ.get("PYDS_USER_PHONE", configs.get("default.user.phone")))
+USER_STATE = get_int(
+ os.environ.get("PYDS_USER_STATE", configs.get("default.user.state"))
+)
+
+# Workflow Settings
+WORKFLOW_PROJECT = os.environ.get(
+ "PYDS_WORKFLOW_PROJECT", configs.get("default.workflow.project")
+)
+WORKFLOW_TENANT = os.environ.get(
+ "PYDS_WORKFLOW_TENANT", configs.get("default.workflow.tenant")
+)
+WORKFLOW_USER = os.environ.get(
+ "PYDS_WORKFLOW_USER", configs.get("default.workflow.user")
+)
+WORKFLOW_QUEUE = os.environ.get(
+ "PYDS_WORKFLOW_QUEUE", configs.get("default.workflow.queue")
+)
+WORKFLOW_RELEASE_STATE = os.environ.get(
+ "PYDS_WORKFLOW_RELEASE_STATE", configs.get("default.workflow.release_state")
+)
+WORKFLOW_WORKER_GROUP = os.environ.get(
+ "PYDS_WORKFLOW_WORKER_GROUP", configs.get("default.workflow.worker_group")
+)
+WORKFLOW_TIME_ZONE = os.environ.get(
+ "PYDS_WORKFLOW_TIME_ZONE", configs.get("default.workflow.time_zone")
+)
+WORKFLOW_WARNING_TYPE = os.environ.get(
+ "PYDS_WORKFLOW_WARNING_TYPE", configs.get("default.workflow.warning_type")
+)
+
+# End Common Configuration Setting
diff --git a/src/pydolphinscheduler/core/default_config.yaml b/src/pydolphinscheduler/core/default_config.yaml
new file mode 100644
index 0000000..5541af7
--- /dev/null
+++ b/src/pydolphinscheduler/core/default_config.yaml
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Setting about Java gateway server
+java_gateway:
+ # The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different
+ # between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`
+ address: 127.0.0.1
+
+ # The port of Python gateway server start. Define which port you could connect to Python gateway server from
+ # Python API side.
+ port: 25333
+
+ # Whether automatically convert Python objects to Java Objects. Default value is ``True``. There is some
+ # performance lost when set to ``True`` but for now pydolphinscheduler do not handle the convert issue between
+ # java and Python, mark it as TODO item in the future.
+ auto_convert: true
+
+# Setting about dolphinscheduler default value, will use the value set below if property do not set, which
+# including ``user``, ``workflow``
+default:
+ # Default value for dolphinscheduler's user object
+ user:
+ name: userPythonGateway
+ password: userPythonGateway
+ email: userPythonGateway@dolphinscheduler.com
+ tenant: tenant_pydolphin
+ phone: 11111111111
+ state: 1
+ # Default value for dolphinscheduler's workflow object
+ workflow:
+ project: project-pydolphin
+ tenant: tenant_pydolphin
+ user: userPythonGateway
+ queue: queuePythonGateway
+ worker_group: default
+ # Release state of workflow, default value is ``online`` which mean setting workflow online when it submits
+ # to Java gateway, if you want to set workflow offline set its value to ``offline``
+ release_state: online
+ time_zone: Asia/Shanghai
+ # Warning type of the workflow, default value is ``NONE`` mean do not warn user in any cases of workflow state,
+ # change to ``FAILURE`` if you want to warn users when workflow failed. All available enum value are
+ # ``NONE``, ``SUCCESS``, ``FAILURE``, ``ALL``
+ warning_type: NONE
diff --git a/src/pydolphinscheduler/core/process_definition.py b/src/pydolphinscheduler/core/process_definition.py
index 1c123fc..63e0808 100644
--- a/src/pydolphinscheduler/core/process_definition.py
+++ b/src/pydolphinscheduler/core/process_definition.py
@@ -21,11 +21,8 @@ import json
from datetime import datetime
from typing import Any, Dict, List, Optional, Set
-from pydolphinscheduler.constants import (
- ProcessDefinitionDefault,
- ProcessDefinitionReleaseState,
- TaskType,
-)
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.base import Base
from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException
from pydolphinscheduler.java_gateway import launch_gateway
@@ -58,6 +55,17 @@ class ProcessDefinition(Base):
"""process definition object, will define process definition attribute, task, relation.
TODO: maybe we should rename this class, currently use DS object name.
+
+ :param user: The user for current process definition. Will create a new one if it do not exists. If your
+ parameter ``project`` already exists but project's create do not belongs to ``user``, will grant
+ ``project`` to ``user`` automatically.
+ :param project: The project for current process definition. You could see the workflow in this project
+ thought Web UI after it :func:`submit` or :func:`run`. It will create a new project belongs to
+ ``user`` if it does not exists. And when ``project`` exists but project's create do not belongs
+ to ``user``, will grant `project` to ``user`` automatically.
+ :param resource_list: Resource files required by the current process definition.You can create and modify
+ resource files from this field. When the process definition is submitted, these resource files are
+ also submitted along with it.
"""
# key attribute for identify ProcessDefinition object
@@ -75,12 +83,15 @@ class ProcessDefinition(Base):
"_project",
"_tenant",
"worker_group",
+ "warning_type",
+ "warning_group_id",
"timeout",
"release_state",
"param",
"tasks",
"task_definition_json",
"task_relation_json",
+ "resource_list",
}
def __init__(
@@ -90,15 +101,17 @@ class ProcessDefinition(Base):
schedule: Optional[str] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
- timezone: Optional[str] = ProcessDefinitionDefault.TIME_ZONE,
- user: Optional[str] = ProcessDefinitionDefault.USER,
- project: Optional[str] = ProcessDefinitionDefault.PROJECT,
- tenant: Optional[str] = ProcessDefinitionDefault.TENANT,
- queue: Optional[str] = ProcessDefinitionDefault.QUEUE,
- worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP,
+ timezone: Optional[str] = configuration.WORKFLOW_TIME_ZONE,
+ user: Optional[str] = configuration.WORKFLOW_USER,
+ project: Optional[str] = configuration.WORKFLOW_PROJECT,
+ tenant: Optional[str] = configuration.WORKFLOW_TENANT,
+ worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
+ warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE,
+ warning_group_id: Optional[int] = 0,
timeout: Optional[int] = 0,
- release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE,
+ release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
param: Optional[Dict] = None,
+ resource_list: Optional[List] = None,
):
super().__init__(name, description)
self.schedule = schedule
@@ -108,15 +121,23 @@ class ProcessDefinition(Base):
self._user = user
self._project = project
self._tenant = tenant
- self._queue = queue
self.worker_group = worker_group
+ self.warning_type = warning_type
+ if warning_type.strip().upper() not in ("FAILURE", "SUCCESS", "ALL", "NONE"):
+ raise PyDSParamException(
+ "Parameter `warning_type` with unexpect value `%s`", warning_type
+ )
+ else:
+ self.warning_type = warning_type.strip().upper()
+ self.warning_group_id = warning_group_id
self.timeout = timeout
- self.release_state = release_state
+ self._release_state = release_state
self.param = param
self.tasks: dict = {}
# TODO how to fix circle import
self._task_relations: set["TaskRelation"] = set() # noqa: F821
self._process_definition_code = None
+ self.resource_list = resource_list or []
def __enter__(self) -> "ProcessDefinition":
ProcessDefinitionContext.set(self)
@@ -151,15 +172,7 @@ class ProcessDefinition(Base):
For now we just get from python side but not from java gateway side, so it may not correct.
"""
- return User(
- self._user,
- ProcessDefinitionDefault.USER_PWD,
- ProcessDefinitionDefault.USER_EMAIL,
- ProcessDefinitionDefault.USER_PHONE,
- self._tenant,
- self._queue,
- ProcessDefinitionDefault.USER_STATE,
- )
+ return User(name=self._user, tenant=self._tenant)
@staticmethod
def _parse_datetime(val: Any) -> Any:
@@ -190,6 +203,25 @@ class ProcessDefinition(Base):
"""Set attribute end_time."""
self._end_time = val
+ @property
+ def release_state(self) -> int:
+ """Get attribute release_state."""
+ rs_ref = {
+ "online": 1,
+ "offline": 0,
+ }
+ if self._release_state not in rs_ref:
+ raise PyDSParamException(
+ "Parameter release_state only support `online` or `offline` but get %",
+ self._release_state,
+ )
+ return rs_ref[self._release_state]
+
+ @release_state.setter
+ def release_state(self, val: str) -> None:
+ """Set attribute release_state."""
+ self._release_state = val.lower()
+
@property
def param_json(self) -> Optional[List[Dict]]:
"""Return param json base on self.param."""
@@ -334,8 +366,6 @@ class ProcessDefinition(Base):
:class:`pydolphinscheduler.constants.ProcessDefinitionDefault`.
"""
# TODO used metaclass for more pythonic
- self.tenant.create_if_not_exists(self._queue)
- # model User have to create after Tenant created
self.user.create_if_not_exists()
# Project model need User object exists
self.project.create_if_not_exists(self._user)
@@ -345,14 +375,16 @@ class ProcessDefinition(Base):
This method should be called before process definition submit to java gateway
For now, we have below checker:
- * `self.param` should be set if task `switch` in this workflow.
+ * `self.param` or at least one local param of task should be set if task `switch` in this workflow.
"""
if (
any([task.task_type == TaskType.SWITCH for task in self.tasks.values()])
and self.param is None
+ and all([len(task.local_params) == 0 for task in self.tasks.values()])
):
raise PyDSParamException(
- "Parameter param must be provider if task Switch in process definition."
+ "Parameter param or at least one local_param of task must "
+ "be provider if task Switch in process definition."
)
def submit(self) -> int:
@@ -368,15 +400,26 @@ class ProcessDefinition(Base):
str(self.description) if self.description else "",
json.dumps(self.param_json),
json.dumps(self.schedule_json) if self.schedule_json else None,
+ self.warning_type,
+ self.warning_group_id,
json.dumps(self.task_location),
self.timeout,
self.worker_group,
self._tenant,
+ self.release_state,
# TODO add serialization function
json.dumps(self.task_relation_json),
json.dumps(self.task_definition_json),
None,
)
+ if len(self.resource_list) > 0:
+ for res in self.resource_list:
+ gateway.entry_point.createOrUpdateResource(
+ self._user,
+ res.name,
+ res.description,
+ res.content,
+ )
return self._process_definition_code
def start(self) -> None:
@@ -391,5 +434,7 @@ class ProcessDefinition(Base):
self.name,
"",
self.worker_group,
+ self.warning_type,
+ self.warning_group_id,
24 * 3600,
)
diff --git a/src/pydolphinscheduler/core/base_side.py b/src/pydolphinscheduler/core/resource.py
similarity index 62%
copy from src/pydolphinscheduler/core/base_side.py
copy to src/pydolphinscheduler/core/resource.py
index ed20d70..bd4ffd4 100644
--- a/src/pydolphinscheduler/core/base_side.py
+++ b/src/pydolphinscheduler/core/resource.py
@@ -15,26 +15,29 @@
# specific language governing permissions and limitations
# under the License.
-"""Module for side object."""
+"""Module resource."""
from typing import Optional
-from pydolphinscheduler.constants import ProcessDefinitionDefault
from pydolphinscheduler.core.base import Base
-class BaseSide(Base):
- """Base class for side object, it declare base behavior for them."""
+class Resource(Base):
+ """resource object, will define the resources that you want to create or update.
- def __init__(self, name: str, description: Optional[str] = None):
- super().__init__(name, description)
+ :param name: The fullname of resource.Includes path and suffix.
+ :param content: The description of resource.
+ :param description: The description of resource.
+ """
+
+ _DEFINE_ATTR = {"name", "content", "description"}
- @classmethod
- def create_if_not_exists(
- cls,
- # TODO comment for avoiding cycle import
- # user: Optional[User] = ProcessDefinitionDefault.USER
- user=ProcessDefinitionDefault.USER,
+ def __init__(
+ self,
+ name: str,
+ content: str,
+ description: Optional[str] = None,
):
- """Create Base if not exists."""
- raise NotImplementedError
+ super().__init__(name, description)
+ self.content = content
+ self._resource_code = None
diff --git a/src/pydolphinscheduler/core/task.py b/src/pydolphinscheduler/core/task.py
index 693f508..90c0e89 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -17,16 +17,17 @@
"""DolphinScheduler Task and TaskRelation object."""
-import logging
+from logging import getLogger
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
from pydolphinscheduler.constants import (
Delimiter,
- ProcessDefinitionDefault,
+ ResourceKey,
TaskFlag,
TaskPriority,
TaskTimeoutFlag,
)
+from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.base import Base
from pydolphinscheduler.core.process_definition import (
ProcessDefinition,
@@ -34,6 +35,8 @@ from pydolphinscheduler.core.process_definition import (
)
from pydolphinscheduler.java_gateway import launch_gateway
+logger = getLogger(__name__)
+
class TaskRelation(Base):
"""TaskRelation object, describe the relation of exactly two tasks."""
@@ -104,7 +107,7 @@ class Task(Base):
description: Optional[str] = None,
flag: Optional[str] = TaskFlag.YES,
task_priority: Optional[str] = TaskPriority.MEDIUM,
- worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP,
+ worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
delay_time: Optional[int] = 0,
fail_retry_times: Optional[int] = 0,
fail_retry_interval: Optional[int] = 1,
@@ -146,14 +149,14 @@ class Task(Base):
):
self.process_definition.add_task(self)
else:
- logging.warning(
+ logger.warning(
"Task code %d already in process definition, prohibit re-add task.",
self.code,
)
# Attribute for task param
self.local_params = local_params or []
- self.resource_list = resource_list or []
+ self._resource_list = resource_list or []
self.dependence = dependence or {}
self.wait_start_timeout = wait_start_timeout or {}
self._condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
@@ -168,6 +171,22 @@ class Task(Base):
"""Set attribute process_definition."""
self._process_definition = process_definition
+ @property
+ def resource_list(self) -> List:
+ """Get task define attribute `resource_list`."""
+ resources = set()
+ for resource in self._resource_list:
+ if type(resource) == str:
+ resources.add(self.query_resource(resource).get(ResourceKey.ID))
+ elif type(resource) == dict and resource.get(ResourceKey.ID) is not None:
+ logger.warning(
+ """`resource_list` should be defined using List[str] with resource paths,
+ the use of ids to define resources will be remove in version 3.2.0.
+ """
+ )
+ resources.add(resource.get(ResourceKey.ID))
+ return [{ResourceKey.ID: r} for r in resources]
+
@property
def condition_result(self) -> Dict:
"""Get attribute condition_result."""
@@ -271,8 +290,15 @@ class Task(Base):
# TODO get code from specific project process definition and task name
gateway = launch_gateway()
result = gateway.entry_point.getCodeAndVersion(
- self.process_definition._project, self.name
+ self.process_definition._project, self.process_definition.name, self.name
)
# result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
# gateway_result_checker(result)
return result.get("code"), result.get("version")
+
+ def query_resource(self, full_name):
+ """Get resource info from java gateway, contains resource id, name."""
+ gateway = launch_gateway()
+ return gateway.entry_point.queryResourcesFileInfo(
+ self.process_definition.user.name, full_name
+ )
diff --git a/src/pydolphinscheduler/examples/task_dependent_example.py b/src/pydolphinscheduler/examples/task_dependent_example.py
index ae19d95..88d6ea2 100644
--- a/src/pydolphinscheduler/examples/task_dependent_example.py
+++ b/src/pydolphinscheduler/examples/task_dependent_example.py
@@ -35,7 +35,7 @@ task_dependent:
task_dependent(this task dependent on task_dependent_external.task_1 and task_dependent_external.task_2).
"""
-from pydolphinscheduler.constants import ProcessDefinitionDefault
+from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.dependent import And, Dependent, DependentItem, Or
from pydolphinscheduler.tasks.shell import Shell
@@ -58,12 +58,12 @@ with ProcessDefinition(
dependence=And(
Or(
DependentItem(
- project_name=ProcessDefinitionDefault.PROJECT,
+ project_name=configuration.WORKFLOW_PROJECT,
process_definition_name="task_dependent_external",
dependent_task_name="task_1",
),
DependentItem(
- project_name=ProcessDefinitionDefault.PROJECT,
+ project_name=configuration.WORKFLOW_PROJECT,
process_definition_name="task_dependent_external",
dependent_task_name="task_2",
),
diff --git a/src/pydolphinscheduler/examples/tutorial_decorator.py b/src/pydolphinscheduler/examples/tutorial_decorator.py
new file mode 100644
index 0000000..986c1bb
--- /dev/null
+++ b/src/pydolphinscheduler/examples/tutorial_decorator.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+r"""
+A tutorial example take you to experience pydolphinscheduler.
+
+After tutorial.py file submit to Apache DolphinScheduler server a DAG would be create,
+and workflow DAG graph as below:
+
+ --> task_child_one
+ / \
+task_parent --> --> task_union
+ \ /
+ --> task_child_two
+
+it will instantiate and run all the task it have.
+"""
+
+# [start tutorial]
+# [start package_import]
+# Import ProcessDefinition object to define your workflow attributes
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+
+# Import task Shell object cause we would create some shell tasks later
+from pydolphinscheduler.tasks.func_wrap import task
+
+# [end package_import]
+
+
+# [start task_declare]
+@task
+def task_parent():
+ """First task in this workflow."""
+ print("echo hello pydolphinscheduler")
+
+
+@task
+def task_child_one():
+ """Child task will be run parallel after task ``task_parent`` finished."""
+ print("echo 'child one'")
+
+
+@task
+def task_child_two():
+ """Child task will be run parallel after task ``task_parent`` finished."""
+ print("echo 'child two'")
+
+
+@task
+def task_union():
+ """Last task in this workflow."""
+ print("echo union")
+
+
+# [end task_declare]
+
+
+# [start workflow_declare]
+with ProcessDefinition(
+ name="tutorial_decorator",
+ schedule="0 0 0 * * ? *",
+ start_time="2021-01-01",
+ tenant="tenant_exists",
+) as pd:
+ # [end workflow_declare]
+
+ # [start task_relation_declare]
+ task_group = [task_child_one(), task_child_two()]
+ task_parent().set_downstream(task_group)
+
+ task_union() << task_group
+ # [end task_relation_declare]
+
+ # [start submit_or_run]
+ pd.run()
+ # [end submit_or_run]
+# [end tutorial]
diff --git a/src/pydolphinscheduler/exceptions.py b/src/pydolphinscheduler/exceptions.py
index 745ef3e..4d70a58 100644
--- a/src/pydolphinscheduler/exceptions.py
+++ b/src/pydolphinscheduler/exceptions.py
@@ -21,26 +21,22 @@
class PyDSBaseException(Exception):
"""Base exception for pydolphinscheduler."""
- pass
-
class PyDSParamException(PyDSBaseException):
"""Exception for pydolphinscheduler parameter verify error."""
- pass
-
class PyDSTaskNoFoundException(PyDSBaseException):
"""Exception for pydolphinscheduler workflow task no found error."""
- pass
-
class PyDSJavaGatewayException(PyDSBaseException):
"""Exception for pydolphinscheduler Java gateway error."""
- pass
-
class PyDSProcessDefinitionNotAssignException(PyDSBaseException):
"""Exception for pydolphinscheduler process definition not assign error."""
+
+
+class PyDSConfException(PyDSBaseException):
+ """Exception for pydolphinscheduler configuration error."""
diff --git a/src/pydolphinscheduler/java_gateway.py b/src/pydolphinscheduler/java_gateway.py
index 2876ed5..8560638 100644
--- a/src/pydolphinscheduler/java_gateway.py
+++ b/src/pydolphinscheduler/java_gateway.py
@@ -23,6 +23,7 @@ from py4j.java_collections import JavaMap
from py4j.java_gateway import GatewayParameters, JavaGateway
from pydolphinscheduler.constants import JavaGatewayDefault
+from pydolphinscheduler.core import configuration
from pydolphinscheduler.exceptions import PyDSJavaGatewayException
@@ -38,9 +39,9 @@ def launch_gateway(
This is why automatic conversion is disabled by default.
"""
gateway_parameters = GatewayParameters(
- address=address or JavaGatewayDefault.SERVER_ADDRESS,
- port=port or JavaGatewayDefault.SERVER_PORT,
- auto_convert=auto_convert or JavaGatewayDefault.AUTO_CONVERT,
+ address=address or configuration.JAVA_GATEWAY_ADDRESS,
+ port=port or configuration.JAVA_GATEWAY_PORT,
+ auto_convert=auto_convert or configuration.JAVA_GATEWAY_AUTO_CONVERT,
)
gateway = JavaGateway(gateway_parameters=gateway_parameters)
return gateway
diff --git a/src/pydolphinscheduler/side/project.py b/src/pydolphinscheduler/side/project.py
index 02382ca..750e3b8 100644
--- a/src/pydolphinscheduler/side/project.py
+++ b/src/pydolphinscheduler/side/project.py
@@ -19,7 +19,7 @@
from typing import Optional
-from pydolphinscheduler.constants import ProcessDefinitionDefault
+from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.base_side import BaseSide
from pydolphinscheduler.java_gateway import launch_gateway
@@ -29,14 +29,14 @@ class Project(BaseSide):
def __init__(
self,
- name: str = ProcessDefinitionDefault.PROJECT,
+ name: str = configuration.WORKFLOW_PROJECT,
description: Optional[str] = None,
):
super().__init__(name, description)
- def create_if_not_exists(self, user=ProcessDefinitionDefault.USER) -> None:
+ def create_if_not_exists(self, user=configuration.USER_NAME) -> None:
"""Create Project if not exists."""
gateway = launch_gateway()
- gateway.entry_point.createProject(user, self.name, self.description)
+ gateway.entry_point.createOrGrantProject(user, self.name, self.description)
# TODO recover result checker
# gateway_result_checker(result, None)
diff --git a/src/pydolphinscheduler/side/queue.py b/src/pydolphinscheduler/side/queue.py
index 9d6664e..e7c68e1 100644
--- a/src/pydolphinscheduler/side/queue.py
+++ b/src/pydolphinscheduler/side/queue.py
@@ -19,7 +19,7 @@
from typing import Optional
-from pydolphinscheduler.constants import ProcessDefinitionDefault
+from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.base_side import BaseSide
from pydolphinscheduler.java_gateway import gateway_result_checker, launch_gateway
@@ -29,12 +29,12 @@ class Queue(BaseSide):
def __init__(
self,
- name: str = ProcessDefinitionDefault.QUEUE,
+ name: str = configuration.WORKFLOW_QUEUE,
description: Optional[str] = "",
):
super().__init__(name, description)
- def create_if_not_exists(self, user=ProcessDefinitionDefault.USER) -> None:
+ def create_if_not_exists(self, user=configuration.USER_NAME) -> None:
"""Create Queue if not exists."""
gateway = launch_gateway()
# Here we set Queue.name and Queue.queueName same as self.name
diff --git a/src/pydolphinscheduler/side/tenant.py b/src/pydolphinscheduler/side/tenant.py
index 508c033..6aaabfe 100644
--- a/src/pydolphinscheduler/side/tenant.py
+++ b/src/pydolphinscheduler/side/tenant.py
@@ -19,7 +19,7 @@
from typing import Optional
-from pydolphinscheduler.constants import ProcessDefinitionDefault
+from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.base_side import BaseSide
from pydolphinscheduler.java_gateway import launch_gateway
@@ -29,15 +29,15 @@ class Tenant(BaseSide):
def __init__(
self,
- name: str = ProcessDefinitionDefault.TENANT,
- queue: str = ProcessDefinitionDefault.QUEUE,
+ name: str = configuration.WORKFLOW_TENANT,
+ queue: str = configuration.WORKFLOW_QUEUE,
description: Optional[str] = None,
):
super().__init__(name, description)
self.queue = queue
def create_if_not_exists(
- self, queue_name: str, user=ProcessDefinitionDefault.USER
+ self, queue_name: str, user=configuration.USER_NAME
) -> None:
"""Create Tenant if not exists."""
gateway = launch_gateway()
diff --git a/src/pydolphinscheduler/side/user.py b/src/pydolphinscheduler/side/user.py
index cd0145a..510e3a8 100644
--- a/src/pydolphinscheduler/side/user.py
+++ b/src/pydolphinscheduler/side/user.py
@@ -19,8 +19,10 @@
from typing import Optional
+from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.base_side import BaseSide
from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.side.tenant import Tenant
class User(BaseSide):
@@ -39,12 +41,12 @@ class User(BaseSide):
def __init__(
self,
name: str,
- password: str,
- email: str,
- phone: str,
- tenant: str,
- queue: Optional[str] = None,
- status: Optional[int] = 1,
+ password: Optional[str] = configuration.USER_PASSWORD,
+ email: Optional[str] = configuration.USER_EMAIL,
+ phone: Optional[str] = configuration.USER_PHONE,
+ tenant: Optional[str] = configuration.WORKFLOW_TENANT,
+ queue: Optional[str] = configuration.WORKFLOW_QUEUE,
+ status: Optional[int] = configuration.USER_STATE,
):
super().__init__(name)
self.password = password
@@ -54,8 +56,15 @@ class User(BaseSide):
self.queue = queue
self.status = status
+ def create_tenant_if_not_exists(self) -> None:
+ """Create tenant object."""
+ tenant = Tenant(name=self.tenant, queue=self.queue)
+ tenant.create_if_not_exists(self.queue)
+
def create_if_not_exists(self, **kwargs):
"""Create User if not exists."""
+ # Should make sure queue already exists.
+ self.create_tenant_if_not_exists()
gateway = launch_gateway()
gateway.entry_point.createUser(
self.name,
diff --git a/src/pydolphinscheduler/tasks/func_wrap.py b/src/pydolphinscheduler/tasks/func_wrap.py
new file mode 100644
index 0000000..c0b73a1
--- /dev/null
+++ b/src/pydolphinscheduler/tasks/func_wrap.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task function wrapper allows using decorator to create a task."""
+
+import functools
+import inspect
+import itertools
+import types
+
+from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.tasks.python import Python
+
+
+def _get_func_str(func: types.FunctionType) -> str:
+ """Get Python function string without indent from decorator.
+
+ :param func: The function which wraps by decorator ``@task``.
+ """
+ lines = inspect.getsourcelines(func)[0]
+
+ src_strip = ""
+ lead_space_num = None
+ for line in lines:
+ if lead_space_num is None:
+ lead_space_num = sum(1 for _ in itertools.takewhile(str.isspace, line))
+ if line.strip() == "@task":
+ continue
+ elif line.strip().startswith("@"):
+ raise PyDSParamException(
+ "Do no support other decorators for function ``task`` decorator."
+ )
+ src_strip += line[lead_space_num:]
+ return src_strip
+
+
+def task(func: types.FunctionType):
+ """Decorate which covert Python function into pydolphinscheduler task."""
+
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ func_str = _get_func_str(func)
+ return Python(
+ name=kwargs.get("name", func.__name__), definition=func_str, *args, **kwargs
+ )
+
+ return wrapper
diff --git a/src/pydolphinscheduler/tasks/python.py b/src/pydolphinscheduler/tasks/python.py
index 7950480..52903d4 100644
--- a/src/pydolphinscheduler/tasks/python.py
+++ b/src/pydolphinscheduler/tasks/python.py
@@ -18,34 +18,85 @@
"""Task Python."""
import inspect
+import logging
+import re
import types
-from typing import Any
+from typing import Union
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.exceptions import PyDSParamException
+log = logging.getLogger(__file__)
+
class Python(Task):
- """Task Python object, declare behavior for Python task to dolphinscheduler."""
+ """Task Python object, declare behavior for Python task to dolphinscheduler.
+
+ Python task support two types of parameters for :param:``code``, and here is an example:
+
+ Using str type of :param:``code``
+
+ .. code-block:: python
+
+ python_task = Python(name="str_type", code="print('Hello Python task.')")
+
+ Or using Python callable type of :param:``code``
+
+ .. code-block:: python
+
+ def foo():
+ print("Hello Python task.")
+
+ python_task = Python(name="str_type", code=foo)
+
+ :param name: The name for Python task. It define the task name.
+ :param definition: String format of Python script you want to execute or Python callable you
+ want to execute.
+ """
_task_custom_attr = {
"raw_script",
}
- def __init__(self, name: str, code: Any, *args, **kwargs):
+ def __init__(
+ self, name: str, definition: Union[str, types.FunctionType], *args, **kwargs
+ ):
super().__init__(name, TaskType.PYTHON, *args, **kwargs)
- self._code = code
+ self.definition = definition
+
+ def _build_exe_str(self) -> str:
+ """Build executable string from given definition.
+
+ Attribute ``self.definition`` almost is a function, we need to call this function after parsing it
+ to string. The easier way to call a function is using syntax ``func()`` and we use it to call it too.
+ """
+ if isinstance(self.definition, types.FunctionType):
+ py_function = inspect.getsource(self.definition)
+ func_str = f"{py_function}{self.definition.__name__}()"
+ else:
+ pattern = re.compile("^def (\\w+)\\(")
+ find = pattern.findall(self.definition)
+ if not find:
+ log.warning(
+ "Python definition is simple script instead of function, with value %s",
+ self.definition,
+ )
+ return self.definition
+ # Keep function str and function callable always have one blank line
+ func_str = (
+ f"{self.definition}{find[0]}()"
+ if self.definition.endswith("\n")
+ else f"{self.definition}\n{find[0]}()"
+ )
+ return func_str
@property
def raw_script(self) -> str:
"""Get python task define attribute `raw_script`."""
- if isinstance(self._code, str):
- return self._code
- elif isinstance(self._code, types.FunctionType):
- py_function = inspect.getsource(self._code)
- return py_function
+ if isinstance(self.definition, (str, types.FunctionType)):
+ return self._build_exe_str()
else:
raise PyDSParamException(
- "Parameter code do not support % for now.", type(self._code)
+ "Parameter definition do not support % for now.", type(self.definition)
)
diff --git a/src/pydolphinscheduler/tasks/sql.py b/src/pydolphinscheduler/tasks/sql.py
index b5be3e4..a125982 100644
--- a/src/pydolphinscheduler/tasks/sql.py
+++ b/src/pydolphinscheduler/tasks/sql.py
@@ -17,6 +17,7 @@
"""Task sql."""
+import logging
import re
from typing import Dict, Optional
@@ -24,12 +25,14 @@ from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.database import Database
from pydolphinscheduler.core.task import Task
+log = logging.getLogger(__file__)
+
class SqlType:
"""SQL type, for now it just contain `SELECT` and `NO_SELECT`."""
- SELECT = 0
- NOT_SELECT = 1
+ SELECT = "0"
+ NOT_SELECT = "1"
class Sql(Task):
@@ -61,6 +64,7 @@ class Sql(Task):
name: str,
datasource_name: str,
sql: str,
+ sql_type: Optional[str] = None,
pre_statements: Optional[str] = None,
post_statements: Optional[str] = None,
display_rows: Optional[int] = 10,
@@ -69,16 +73,32 @@ class Sql(Task):
):
super().__init__(name, TaskType.SQL, *args, **kwargs)
self.sql = sql
+ self.param_sql_type = sql_type
self.datasource_name = datasource_name
self.pre_statements = pre_statements or []
self.post_statements = post_statements or []
self.display_rows = display_rows
@property
- def sql_type(self) -> int:
- """Judgement sql type, use regexp to check which type of the sql is."""
+ def sql_type(self) -> str:
+ """Judgement sql type, it will return the SQL type for type `SELECT` or `NOT_SELECT`.
+
+ If `param_sql_type` dot not specific, will use regexp to check
+ which type of the SQL is. But if `param_sql_type` is specific
+ will use the parameter overwrites the regexp way
+ """
+ if (
+ self.param_sql_type == SqlType.SELECT
+ or self.param_sql_type == SqlType.NOT_SELECT
+ ):
+ log.info(
+ "The sql type is specified by a parameter, with value %s",
+ self.param_sql_type,
+ )
+ return self.param_sql_type
pattern_select_str = (
- "^(?!(.* |)insert |(.* |)delete |(.* |)drop |(.* |)update |(.* |)alter ).*"
+ "^(?!(.* |)insert |(.* |)delete |(.* |)drop "
+ "|(.* |)update |(.* |)alter |(.* |)create ).*"
)
pattern_select = re.compile(pattern_select_str, re.IGNORECASE)
if pattern_select.match(self.sql) is None:
diff --git a/src/pydolphinscheduler/tasks/switch.py b/src/pydolphinscheduler/tasks/switch.py
index 28032f8..0c9a2b8 100644
--- a/src/pydolphinscheduler/tasks/switch.py
+++ b/src/pydolphinscheduler/tasks/switch.py
@@ -129,7 +129,11 @@ class SwitchCondition(Base):
class Switch(Task):
- """Task switch object, declare behavior for switch task to dolphinscheduler."""
+ """Task switch object, declare behavior for switch task to dolphinscheduler.
+
+ Param of process definition or at least one local param of task must be set
+ if task `switch` in this workflow.
+ """
def __init__(self, name: str, condition: SwitchCondition, *args, **kwargs):
super().__init__(name, TaskType.SWITCH, *args, **kwargs)
diff --git a/src/pydolphinscheduler/utils/file.py b/src/pydolphinscheduler/utils/file.py
new file mode 100644
index 0000000..075b902
--- /dev/null
+++ b/src/pydolphinscheduler/utils/file.py
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""File util for pydolphinscheduler."""
+
+from pathlib import Path
+from typing import Optional
+
+
+def write(
+ content: str,
+ to_path: str,
+ create: Optional[bool] = True,
+ overwrite: Optional[bool] = False,
+) -> None:
+ """Write configs dict to configuration file.
+
+ :param content: The source string want to write to :param:`to_path`.
+ :param to_path: The path want to write content.
+ :param create: Whether create the file parent directory or not if it does not exist.
+ If set ``True`` will create file with :param:`to_path` if path not exists, otherwise
+ ``False`` will not create. Default ``True``.
+ :param overwrite: Whether overwrite the file or not if it exists. If set ``True``
+ will overwrite the exists content, otherwise ``False`` will not overwrite it. Default ``True``.
+ """
+ path = Path(to_path)
+ if not path.parent.exists():
+ if create:
+ path.parent.mkdir(parents=True)
+ else:
+ raise ValueError(
+ "Parent directory do not exists and set param `create` to `False`."
+ )
+ if not path.exists():
+ with path.open(mode="w") as f:
+ f.write(content)
+ elif overwrite:
+ with path.open(mode="w") as f:
+ f.write(content)
+ else:
+ raise FileExistsError(
+ "File %s already exists and you choose not overwrite mode.", to_path
+ )
diff --git a/src/pydolphinscheduler/utils/yaml_parser.py b/src/pydolphinscheduler/utils/yaml_parser.py
new file mode 100644
index 0000000..46ee08c
--- /dev/null
+++ b/src/pydolphinscheduler/utils/yaml_parser.py
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""YAML parser utils, parser yaml string to ``ruamel.yaml`` object and nested key dict."""
+
+import copy
+import io
+from typing import Any, Dict, Optional
+
+from ruamel.yaml import YAML
+from ruamel.yaml.comments import CommentedMap
+
+
+class YamlParser:
+ """A parser to parse Yaml file and provider easier way to access or change value.
+
+ This parser provider delimiter string key to get or set :class:`ruamel.yaml.YAML` object
+
+ For example, yaml config named ``test.yaml`` and its content as below:
+
+ .. code-block:: yaml
+
+ one:
+ two1:
+ three: value1
+ two2: value2
+
+ you could get ``value1`` and ``value2`` by nested path
+
+ .. code-block:: python
+
+ yaml_parser = YamlParser("test.yaml")
+
+ # Use function ``get`` to get value
+ value1 = yaml_parser.get("one.two1.three")
+ # Or use build-in ``__getitem__`` to get value
+ value2 = yaml_parser["one.two2"]
+
+ or you could change ``value1`` to ``value3``, also change ``value2`` to ``value4`` by nested path assigned
+
+ .. code-block:: python
+
+ yaml_parser["one.two1.three"] = "value3"
+ yaml_parser["one.two2"] = "value4"
+ """
+
+ def __init__(self, content: str, delimiter: Optional[str] = "."):
+ self._content = content
+ self.src_parser = content
+ self._delimiter = delimiter
+
+ @property
+ def src_parser(self) -> CommentedMap:
+ """Get src_parser property."""
+ return self._src_parser
+
+ @src_parser.setter
+ def src_parser(self, content: str) -> None:
+ """Set src_parser property."""
+ self._yaml = YAML()
+ self._src_parser = self._yaml.load(content)
+
+ def parse_nested_dict(
+ self, result: Dict, commented_map: CommentedMap, key: str
+ ) -> None:
+ """Parse :class:`ruamel.yaml.comments.CommentedMap` to nested dict using :param:`delimiter`."""
+ if not isinstance(commented_map, CommentedMap):
+ return
+ for sub_key in set(commented_map.keys()):
+ next_key = f"{key}{self._delimiter}{sub_key}"
+ result[next_key] = commented_map[sub_key]
+ self.parse_nested_dict(result, commented_map[sub_key], next_key)
+
+ @property
+ def dict_parser(self) -> Dict:
+ """Get :class:`CommentedMap` to nested dict using :param:`delimiter` as key delimiter.
+
+ Use Depth-First-Search get all nested key and value, and all key connect by :param:`delimiter`.
+ It make users could easier access or change :class:`CommentedMap` object.
+
+ For example, yaml config named ``test.yaml`` and its content as below:
+
+ .. code-block:: yaml
+
+ one:
+ two1:
+ three: value1
+ two2: value2
+
+ It could parser to nested dict as
+
+ .. code-block:: python
+
+ {
+ "one": ordereddict([('two1', ordereddict([('three', 'value1')])), ('two2', 'value2')]),
+ "one.two1": ordereddict([('three', 'value1')]),
+ "one.two1.three": "value1",
+ "one.two2": "value2",
+ }
+ """
+ res = dict()
+ src_parser_copy = copy.deepcopy(self.src_parser)
+
+ base_keys = set(src_parser_copy.keys())
+ if not base_keys:
+ return res
+ else:
+ for key in base_keys:
+ res[key] = src_parser_copy[key]
+ self.parse_nested_dict(res, src_parser_copy[key], key)
+ return res
+
+ def __contains__(self, key) -> bool:
+ return key in self.dict_parser
+
+ def __getitem__(self, key: str) -> Any:
+ return self.dict_parser[key]
+
+ def __setitem__(self, key: str, val: Any) -> None:
+ if key not in self.dict_parser:
+ raise KeyError("Key %s do not exists.", key)
+
+ mid = None
+ keys = key.split(self._delimiter)
+ for idx, k in enumerate(keys, 1):
+ if idx == len(keys):
+ mid[k] = val
+ else:
+ mid = mid[k] if mid else self.src_parser[k]
+
+ def get(self, key: str) -> Any:
+ """Get value by key, is call ``__getitem__``."""
+ return self[key]
+
+ def __str__(self) -> str:
+ """Transfer :class:`YamlParser` to string object.
+
+ It is useful when users want to output the :class:`YamlParser` object they change just now.
+ """
+ buf = io.StringIO()
+ self._yaml.dump(self.src_parser, buf)
+ return buf.getvalue()
+
+ def __repr__(self) -> str:
+ return f"YamlParser({str(self)})"
diff --git a/src/pydolphinscheduler/core/__init__.py b/tests/cli/__init__.py
similarity index 73%
copy from src/pydolphinscheduler/core/__init__.py
copy to tests/cli/__init__.py
index 31dc944..f1a4396 100644
--- a/src/pydolphinscheduler/core/__init__.py
+++ b/tests/cli/__init__.py
@@ -15,14 +15,4 @@
# specific language governing permissions and limitations
# under the License.
-"""Init pydolphinscheduler.core package."""
-
-from pydolphinscheduler.core.database import Database
-from pydolphinscheduler.core.process_definition import ProcessDefinition
-from pydolphinscheduler.core.task import Task
-
-__all__ = [
- "ProcessDefinition",
- "Task",
- "Database",
-]
+"""Init command line interface tests."""
diff --git a/tests/cli/test_config.py b/tests/cli/test_config.py
new file mode 100644
index 0000000..d913277
--- /dev/null
+++ b/tests/cli/test_config.py
@@ -0,0 +1,198 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test command line interface subcommand `config`."""
+
+import os
+from pathlib import Path
+
+import pytest
+
+from pydolphinscheduler.cli.commands import cli
+from pydolphinscheduler.core.configuration import BUILD_IN_CONFIG_PATH, config_path
+from tests.testing.cli import CliTestWrapper
+from tests.testing.constants import DEV_MODE, ENV_PYDS_HOME
+from tests.testing.file import get_file_content
+
+config_file = "config.yaml"
+
+
+@pytest.fixture
+def teardown_file_env():
+ """Util for deleting temp configuration file and pop env var after test finish."""
+ yield
+ config_file_path = config_path()
+ if config_file_path.exists():
+ config_file_path.unlink()
+ # pop environment variable to keep test cases dependent
+ os.environ.pop(ENV_PYDS_HOME, None)
+ assert ENV_PYDS_HOME not in os.environ
+
+
+@pytest.mark.parametrize(
+ "home",
+ [
+ None,
+ "/tmp/pydolphinscheduler",
+ "/tmp/test_abc",
+ ],
+)
+def test_config_init(teardown_file_env, home):
+ """Test command line interface `config --init`."""
+ if home:
+ os.environ[ENV_PYDS_HOME] = home
+ elif DEV_MODE:
+ pytest.skip(
+ "Avoid delete ~/pydolphinscheduler/config.yaml by accident when test locally."
+ )
+
+ config_file_path = config_path()
+ assert not config_file_path.exists()
+
+ cli_test = CliTestWrapper(cli, ["config", "--init"])
+ cli_test.assert_success()
+
+ assert config_file_path.exists()
+ assert get_file_content(config_file_path) == get_file_content(BUILD_IN_CONFIG_PATH)
+
+
+@pytest.mark.parametrize(
+ "key, expect",
+ [
+ # We test each key in one single section
+ ("java_gateway.address", "127.0.0.1"),
+ ("default.user.name", "userPythonGateway"),
+ ("default.workflow.project", "project-pydolphin"),
+ ],
+)
+def test_config_get(teardown_file_env, key: str, expect: str):
+ """Test command line interface `config --get XXX`."""
+ os.environ[ENV_PYDS_HOME] = "/tmp/pydolphinscheduler"
+ cli_test = CliTestWrapper(cli, ["config", "--init"])
+ cli_test.assert_success()
+
+ cli_test = CliTestWrapper(cli, ["config", "--get", key])
+ cli_test.assert_success(output=f"{key} = {expect}", fuzzy=True)
+
+
+@pytest.mark.parametrize(
+ "keys, expects",
+ [
+ # We test mix section keys
+ (("java_gateway.address", "java_gateway.port"), ("127.0.0.1", "25333")),
+ (
+ ("java_gateway.auto_convert", "default.user.tenant"),
+ ("True", "tenant_pydolphin"),
+ ),
+ (
+ (
+ "java_gateway.port",
+ "default.user.state",
+ "default.workflow.worker_group",
+ ),
+ ("25333", "1", "default"),
+ ),
+ ],
+)
+def test_config_get_multiple(teardown_file_env, keys: str, expects: str):
+ """Test command line interface `config --get KEY1 --get KEY2 ...`."""
+ os.environ[ENV_PYDS_HOME] = "/tmp/pydolphinscheduler"
+ cli_test = CliTestWrapper(cli, ["config", "--init"])
+ cli_test.assert_success()
+
+ get_args = ["config"]
+ for key in keys:
+ get_args.append("--get")
+ get_args.append(key)
+ cli_test = CliTestWrapper(cli, get_args)
+
+ for idx, expect in enumerate(expects):
+ cli_test.assert_success(output=f"{keys[idx]} = {expect}", fuzzy=True)
+
+
+@pytest.mark.parametrize(
+ "key, value",
+ [
+ # We test each key in one single section
+ ("java_gateway.address", "127.1.1.1"),
+ ("default.user.name", "editUserPythonGateway"),
+ ("default.workflow.project", "edit-project-pydolphin"),
+ ],
+)
+def test_config_set(teardown_file_env, key: str, value: str):
+ """Test command line interface `config --set KEY VALUE`."""
+ path = "/tmp/pydolphinscheduler"
+ assert not Path(path).joinpath(config_file).exists()
+ os.environ[ENV_PYDS_HOME] = path
+ cli_test = CliTestWrapper(cli, ["config", "--init"])
+ cli_test.assert_success()
+
+ # Make sure value do not exists first
+ cli_test = CliTestWrapper(cli, ["config", "--get", key])
+ assert f"{key} = {value}" not in cli_test.result.output
+
+ cli_test = CliTestWrapper(cli, ["config", "--set", key, value])
+ cli_test.assert_success()
+
+ cli_test = CliTestWrapper(cli, ["config", "--get", key])
+ assert f"{key} = {value}" in cli_test.result.output
+
+
+@pytest.mark.parametrize(
+ "keys, values",
+ [
+ # We test each key in mixture section
+ (("java_gateway.address", "java_gateway.port"), ("127.1.1.1", "25444")),
+ (
+ ("java_gateway.auto_convert", "default.user.tenant"),
+ ("False", "edit_tenant_pydolphin"),
+ ),
+ (
+ (
+ "java_gateway.port",
+ "default.user.state",
+ "default.workflow.worker_group",
+ ),
+ ("25555", "0", "not-default"),
+ ),
+ ],
+)
+def test_config_set_multiple(teardown_file_env, keys: str, values: str):
+ """Test command line interface `config --set KEY1 VAL1 --set KEY2 VAL2`."""
+ path = "/tmp/pydolphinscheduler"
+ assert not Path(path).joinpath(config_file).exists()
+ os.environ[ENV_PYDS_HOME] = path
+ cli_test = CliTestWrapper(cli, ["config", "--init"])
+ cli_test.assert_success()
+
+ set_args = ["config"]
+ for idx, key in enumerate(keys):
+ # Make sure values do not exists first
+ cli_test = CliTestWrapper(cli, ["config", "--get", key])
+ assert f"{key} = {values[idx]}" not in cli_test.result.output
+
+ set_args.append("--set")
+ set_args.append(key)
+ set_args.append(values[idx])
+
+ cli_test = CliTestWrapper(cli, set_args)
+ cli_test.assert_success()
+
+ for idx, key in enumerate(keys):
+ # Make sure values exists after `config --set` run
+ cli_test = CliTestWrapper(cli, ["config", "--get", key])
+ assert f"{key} = {values[idx]}" in cli_test.result.output
diff --git a/tests/cli/test_version.py b/tests/cli/test_version.py
new file mode 100644
index 0000000..f0dcb0e
--- /dev/null
+++ b/tests/cli/test_version.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test command line interface subcommand `version`."""
+
+import pytest
+
+from pydolphinscheduler import __version__
+from pydolphinscheduler.cli.commands import cli
+from tests.testing.cli import CliTestWrapper
+
+
+def test_version():
+ """Test whether subcommand `version` correct."""
+ cli_test = CliTestWrapper(cli, ["version"])
+ cli_test.assert_success(output=f"{__version__}")
+
+
+@pytest.mark.parametrize(
+ "part, idx",
+ [
+ ("major", 0),
+ ("minor", 1),
+ ("micro", 2),
+ ],
+)
+def test_version_part(part: str, idx: int):
+ """Test subcommand `version` option `--part`."""
+ cli_test = CliTestWrapper(cli, ["version", "--part", part])
+ cli_test.assert_success(output=f"{__version__.split('.')[idx]}")
+
+
+@pytest.mark.parametrize(
+ "option, output",
+ [
+ # not support option
+ (["version", "--not-support"], "No such option"),
+ # not support option value
+ (["version", "--part", "abc"], "Invalid value for '--part'"),
+ ],
+)
+def test_version_not_support_option(option, output):
+ """Test subcommand `version` not support option or option value."""
+ cli_test = CliTestWrapper(cli, option)
+ cli_test.assert_fail(ret_code=2, output=output, fuzzy=True)
diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py
new file mode 100644
index 0000000..c7e217a
--- /dev/null
+++ b/tests/core/test_configuration.py
@@ -0,0 +1,272 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test class :mod:`pydolphinscheduler.core.configuration`' method."""
+
+import importlib
+import os
+from pathlib import Path
+from typing import Any
+
+import pytest
+
+from pydolphinscheduler.core import configuration
+from pydolphinscheduler.core.configuration import (
+ BUILD_IN_CONFIG_PATH,
+ config_path,
+ get_single_config,
+ set_single_config,
+)
+from pydolphinscheduler.exceptions import PyDSConfException
+from pydolphinscheduler.utils.yaml_parser import YamlParser
+from tests.testing.constants import DEV_MODE, ENV_PYDS_HOME
+from tests.testing.file import get_file_content
+
+
+@pytest.fixture
+def teardown_file_env():
+ """Util for deleting temp configuration file and pop env var after test finish."""
+ yield
+ config_file_path = config_path()
+ if config_file_path.exists():
+ config_file_path.unlink()
+ os.environ.pop(ENV_PYDS_HOME, None)
+
+
+@pytest.mark.parametrize(
+ "val, expect",
+ [
+ ("1", 1),
+ ("123", 123),
+ ("4567", 4567),
+ (b"1234", 1234),
+ ],
+)
+def test_get_int(val: Any, expect: int):
+ """Test function :func:`configuration.get_int`."""
+ assert configuration.get_int(val) == expect
+
+
+@pytest.mark.parametrize(
+ "val",
+ [
+ "a",
+ "1a",
+ "1d2",
+ "1723-",
+ ],
+)
+def test_get_int_error(val: Any):
+ """Test function :func:`configuration.get_int`."""
+ with pytest.raises(ValueError):
+ configuration.get_int(val)
+
+
+@pytest.mark.parametrize(
+ "val, expect",
+ [
+ ("t", True),
+ ("true", True),
+ (1, True),
+ (True, True),
+ ("f", False),
+ ("false", False),
+ (0, False),
+ (123, False),
+ ("abc", False),
+ ("abc1", False),
+ (False, False),
+ ],
+)
+def test_get_bool(val: Any, expect: bool):
+ """Test function :func:`configuration.get_bool`."""
+ assert configuration.get_bool(val) == expect
+
+
+@pytest.mark.parametrize(
+ "home, expect",
+ [
+ (None, "~/pydolphinscheduler/config.yaml"),
+ ("/tmp/pydolphinscheduler", "/tmp/pydolphinscheduler/config.yaml"),
+ ("/tmp/test_abc", "/tmp/test_abc/config.yaml"),
+ ],
+)
+def test_config_path(home: Any, expect: str):
+ """Test function :func:`config_path`."""
+ if home:
+ os.environ[ENV_PYDS_HOME] = home
+ assert Path(expect).expanduser() == configuration.config_path()
+
+
+@pytest.mark.parametrize(
+ "home",
+ [
+ None,
+ "/tmp/pydolphinscheduler",
+ "/tmp/test_abc",
+ ],
+)
+def test_init_config_file(teardown_file_env, home: Any):
+ """Test init config file."""
+ if home:
+ os.environ[ENV_PYDS_HOME] = home
+ elif DEV_MODE:
+ pytest.skip(
+ "Avoid delete ~/pydolphinscheduler/config.yaml by accident when test locally."
+ )
+ assert not config_path().exists()
+ configuration.init_config_file()
+ assert config_path().exists()
+
+ assert get_file_content(config_path()) == get_file_content(BUILD_IN_CONFIG_PATH)
+
+
+@pytest.mark.parametrize(
+ "home",
+ [
+ None,
+ "/tmp/pydolphinscheduler",
+ "/tmp/test_abc",
+ ],
+)
+def test_init_config_file_duplicate(teardown_file_env, home: Any):
+ """Test raise error with init config file which already exists."""
+ if home:
+ os.environ[ENV_PYDS_HOME] = home
+ elif DEV_MODE:
+ pytest.skip(
+ "Avoid delete ~/pydolphinscheduler/config.yaml by accident when test locally."
+ )
+ assert not config_path().exists()
+ configuration.init_config_file()
+ assert config_path().exists()
+
+ with pytest.raises(PyDSConfException, match=".*file already exists.*"):
+ configuration.init_config_file()
+
+
+def test_get_configs_build_in():
+ """Test function :func:`get_configs` with build-in config file."""
+ content = get_file_content(BUILD_IN_CONFIG_PATH)
+ assert YamlParser(content).src_parser == configuration.get_configs().src_parser
+ assert YamlParser(content).dict_parser == configuration.get_configs().dict_parser
+
+
+@pytest.mark.parametrize(
+ "key, val, new_val",
+ [
+ ("java_gateway.address", "127.0.0.1", "127.1.1.1"),
+ ("java_gateway.port", 25333, 25555),
+ ("java_gateway.auto_convert", True, False),
+ ("default.user.name", "userPythonGateway", "editUserPythonGateway"),
+ ("default.user.password", "userPythonGateway", "editUserPythonGateway"),
+ (
+ "default.user.email",
+ "userPythonGateway@dolphinscheduler.com",
+ "userPythonGateway@edit.com",
+ ),
+ ("default.user.phone", 11111111111, 22222222222),
+ ("default.user.state", 1, 0),
+ ("default.workflow.project", "project-pydolphin", "eidt-project-pydolphin"),
+ ("default.workflow.tenant", "tenant_pydolphin", "edit_tenant_pydolphin"),
+ ("default.workflow.user", "userPythonGateway", "editUserPythonGateway"),
+ ("default.workflow.queue", "queuePythonGateway", "editQueuePythonGateway"),
+ ("default.workflow.worker_group", "default", "specific"),
+ ("default.workflow.time_zone", "Asia/Shanghai", "Asia/Beijing"),
+ ("default.workflow.warning_type", "NONE", "ALL"),
+ ],
+)
+def test_single_config_get_set(teardown_file_env, key: str, val: Any, new_val: Any):
+ """Test function :func:`get_single_config` and :func:`set_single_config`."""
+ assert val == get_single_config(key)
+ set_single_config(key, new_val)
+ assert new_val == get_single_config(key)
+
+
+def test_single_config_get_set_not_exists_key():
+ """Test function :func:`get_single_config` and :func:`set_single_config` error while key not exists."""
+ not_exists_key = "i_am_not_exists_key"
+ with pytest.raises(PyDSConfException, match=".*do not exists.*"):
+ get_single_config(not_exists_key)
+ with pytest.raises(PyDSConfException, match=".*do not exists.*"):
+ set_single_config(not_exists_key, not_exists_key)
+
+
+@pytest.mark.parametrize(
+ "config_name, expect",
+ [
+ ("JAVA_GATEWAY_ADDRESS", "127.0.0.1"),
+ ("JAVA_GATEWAY_PORT", 25333),
+ ("JAVA_GATEWAY_AUTO_CONVERT", True),
+ ("USER_NAME", "userPythonGateway"),
+ ("USER_PASSWORD", "userPythonGateway"),
+ ("USER_EMAIL", "userPythonGateway@dolphinscheduler.com"),
+ ("USER_PHONE", "11111111111"),
+ ("USER_STATE", 1),
+ ("WORKFLOW_PROJECT", "project-pydolphin"),
+ ("WORKFLOW_TENANT", "tenant_pydolphin"),
+ ("WORKFLOW_USER", "userPythonGateway"),
+ ("WORKFLOW_QUEUE", "queuePythonGateway"),
+ ("WORKFLOW_WORKER_GROUP", "default"),
+ ("WORKFLOW_TIME_ZONE", "Asia/Shanghai"),
+ ("WORKFLOW_WARNING_TYPE", "NONE"),
+ ],
+)
+def test_get_configuration(config_name: str, expect: Any):
+ """Test get exists attribute in :mod:`configuration`."""
+ assert expect == getattr(configuration, config_name)
+
+
+@pytest.mark.parametrize(
+ "config_name, src, dest",
+ [
+ ("JAVA_GATEWAY_ADDRESS", "127.0.0.1", "192.168.1.1"),
+ ("JAVA_GATEWAY_PORT", 25333, 25334),
+ ("JAVA_GATEWAY_AUTO_CONVERT", True, False),
+ ("USER_NAME", "userPythonGateway", "envUserPythonGateway"),
+ ("USER_PASSWORD", "userPythonGateway", "envUserPythonGateway"),
+ (
+ "USER_EMAIL",
+ "userPythonGateway@dolphinscheduler.com",
+ "userPythonGateway@dolphinscheduler.com",
+ ),
+ ("USER_PHONE", "11111111111", "22222222222"),
+ ("USER_STATE", 1, 0),
+ ("WORKFLOW_PROJECT", "project-pydolphin", "env-project-pydolphin"),
+ ("WORKFLOW_TENANT", "tenant_pydolphin", "env-tenant_pydolphin"),
+ ("WORKFLOW_USER", "userPythonGateway", "envUserPythonGateway"),
+ ("WORKFLOW_QUEUE", "queuePythonGateway", "envQueuePythonGateway"),
+ ("WORKFLOW_WORKER_GROUP", "default", "custom"),
+ ("WORKFLOW_TIME_ZONE", "Asia/Shanghai", "America/Los_Angeles"),
+ ("WORKFLOW_WARNING_TYPE", "NONE", "ALL"),
+ ],
+)
+def test_get_configuration_env(config_name: str, src: Any, dest: Any):
+ """Test get exists attribute from environment variable in :mod:`configuration`."""
+ assert getattr(configuration, config_name) == src
+
+ env_name = f"PYDS_{config_name}"
+ os.environ[env_name] = str(dest)
+ # reload module configuration to re-get config from environment.
+ importlib.reload(configuration)
+ assert getattr(configuration, config_name) == dest
+
+ # pop and reload configuration to test whether this config equal to `src` value
+ os.environ.pop(env_name, None)
+ importlib.reload(configuration)
+ assert getattr(configuration, config_name) == src
+ assert env_name not in os.environ
diff --git a/tests/testing/task.py b/tests/core/test_default_config_yaml.py
similarity index 50%
copy from tests/testing/task.py
copy to tests/core/test_default_config_yaml.py
index e0affc9..b4d5e07 100644
--- a/tests/testing/task.py
+++ b/tests/core/test_default_config_yaml.py
@@ -15,18 +15,25 @@
# specific language governing permissions and limitations
# under the License.
-"""Mock class Task for other test."""
+"""Test default config file."""
-import uuid
+from ruamel.yaml import YAML
+from ruamel.yaml.comments import CommentedMap
-from pydolphinscheduler.core.task import Task as SourceTask
+from tests.testing.path import path_default_config_yaml
-class Task(SourceTask):
- """Mock class :class:`pydolphinscheduler.core.task.Task` for unittest."""
+def nested_key_check(comment_map: CommentedMap) -> None:
+ """Test whether default configuration file exists specific character."""
+ for key, val in comment_map.items():
+ assert "." not in key, f"There is not allowed special character in key `{key}`."
+ if isinstance(val, CommentedMap):
+ nested_key_check(val)
- DEFAULT_VERSION = 1
- def gen_code_and_version(self):
- """Mock java gateway code and version, convenience method for unittest."""
- return uuid.uuid1().time, self.DEFAULT_VERSION
+def test_key_without_dot_delimiter():
+ """Test wrapper of whether default configuration file exists specific character."""
+ yaml = YAML()
+ with open(path_default_config_yaml, "r") as f:
+ comment_map = yaml.load(f.read())
+ nested_key_check(comment_map)
diff --git a/tests/core/test_process_definition.py b/tests/core/test_process_definition.py
index f51338d..5cb6dab 100644
--- a/tests/core/test_process_definition.py
+++ b/tests/core/test_process_definition.py
@@ -18,17 +18,15 @@
"""Test process definition."""
from datetime import datetime
-from typing import Any
+from typing import Any, List
from unittest.mock import patch
import pytest
from freezegun import freeze_time
-from pydolphinscheduler.constants import (
- ProcessDefinitionDefault,
- ProcessDefinitionReleaseState,
-)
+from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.resource import Resource
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.side import Project, Tenant, User
from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition
@@ -51,23 +49,25 @@ def test_process_definition_key_attr(func):
@pytest.mark.parametrize(
"name,value",
[
- ("timezone", ProcessDefinitionDefault.TIME_ZONE),
- ("project", Project(ProcessDefinitionDefault.PROJECT)),
- ("tenant", Tenant(ProcessDefinitionDefault.TENANT)),
+ ("timezone", configuration.WORKFLOW_TIME_ZONE),
+ ("project", Project(configuration.WORKFLOW_PROJECT)),
+ ("tenant", Tenant(configuration.WORKFLOW_TENANT)),
(
"user",
User(
- ProcessDefinitionDefault.USER,
- ProcessDefinitionDefault.USER_PWD,
- ProcessDefinitionDefault.USER_EMAIL,
- ProcessDefinitionDefault.USER_PHONE,
- ProcessDefinitionDefault.TENANT,
- ProcessDefinitionDefault.QUEUE,
- ProcessDefinitionDefault.USER_STATE,
+ configuration.USER_NAME,
+ configuration.USER_PASSWORD,
+ configuration.USER_EMAIL,
+ configuration.USER_PHONE,
+ configuration.WORKFLOW_TENANT,
+ configuration.WORKFLOW_QUEUE,
+ configuration.USER_STATE,
),
),
- ("worker_group", ProcessDefinitionDefault.WORKER_GROUP),
- ("release_state", ProcessDefinitionReleaseState.ONLINE),
+ ("worker_group", configuration.WORKFLOW_WORKER_GROUP),
+ ("warning_type", configuration.WORKFLOW_WARNING_TYPE),
+ ("warning_group_id", 0),
+ ("release_state", 1),
],
)
def test_process_definition_default_value(name, value):
@@ -87,9 +87,15 @@ def test_process_definition_default_value(name, value):
("schedule", str, "schedule"),
("timezone", str, "timezone"),
("worker_group", str, "worker_group"),
+ ("warning_type", str, "FAILURE"),
+ ("warning_group_id", int, 1),
("timeout", int, 1),
- ("release_state", str, "OFFLINE"),
("param", dict, {"key": "value"}),
+ (
+ "resource_list",
+ List,
+ [Resource(name="/dev/test.py", content="hello world", description="desc")],
+ ),
],
)
def test_set_attr(name, cls, expect):
@@ -101,6 +107,41 @@ def test_set_attr(name, cls, expect):
), f"ProcessDefinition set attribute `{name}` do not work expect"
+@pytest.mark.parametrize(
+ "value,expect",
+ [
+ ("online", 1),
+ ("offline", 0),
+ ],
+)
+def test_set_release_state(value, expect):
+ """Test process definition set release_state attributes."""
+ with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, release_state=value) as pd:
+ assert (
+ getattr(pd, "release_state") == expect
+ ), "ProcessDefinition set attribute release_state do not return expect value."
+
+
+@pytest.mark.parametrize(
+ "value",
+ [
+ "oneline",
+ "offeline",
+ 1,
+ 0,
+ None,
+ ],
+)
+def test_set_release_state_error(value):
+ """Test process definition set release_state attributes with error."""
+ pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, release_state=value)
+ with pytest.raises(
+ PyDSParamException,
+ match="Parameter release_state only support `online` or `offline` but get.*",
+ ):
+ pd.release_state
+
+
@pytest.mark.parametrize(
"set_attr,set_val,get_attr,get_val",
[
@@ -154,6 +195,21 @@ def test__parse_datetime_not_support_type(val: Any):
pd._parse_datetime(val)
+@pytest.mark.parametrize(
+ "val",
+ [
+ "ALLL",
+ "nonee",
+ ],
+)
+def test_warn_type_not_support_type(val: str):
+ """Test process definition param warning_type not support type error."""
+ with pytest.raises(
+ PyDSParamException, match="Parameter `warning_type` with unexpect value.*?"
+ ):
+ ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, warning_type=val)
+
+
@pytest.mark.parametrize(
"param, expect",
[
@@ -223,25 +279,55 @@ def test__pre_submit_check_switch_without_param(mock_code_version):
parent >> switch
with pytest.raises(
PyDSParamException,
- match="Parameter param must be provider if task Switch in process definition.",
+ match="Parameter param or at least one local_param of task must "
+ "be provider if task Switch in process definition.",
):
pd._pre_submit_check()
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+def test__pre_submit_check_switch_with_local_params(mock_code_version):
+ """Test :func:`_pre_submit_check` if process definition with switch with local params of task."""
+ with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+ parent = Task(
+ name="parent",
+ task_type=TEST_TASK_TYPE,
+ local_params=[
+ {"prop": "var", "direct": "OUT", "type": "VARCHAR", "value": ""}
+ ],
+ )
+ switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE)
+ switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE)
+ switch_condition = SwitchCondition(
+ Branch(condition="${var} > 1", task=switch_child_1),
+ Default(task=switch_child_2),
+ )
+
+ switch = Switch(name="switch", condition=switch_condition)
+ parent >> switch
+ pd._pre_submit_check()
+
+
def test_process_definition_get_define_without_task():
"""Test process definition function get_define without task."""
expect = {
"name": TEST_PROCESS_DEFINITION_NAME,
"description": None,
- "project": ProcessDefinitionDefault.PROJECT,
- "tenant": ProcessDefinitionDefault.TENANT,
- "workerGroup": ProcessDefinitionDefault.WORKER_GROUP,
+ "project": configuration.WORKFLOW_PROJECT,
+ "tenant": configuration.WORKFLOW_TENANT,
+ "workerGroup": configuration.WORKFLOW_WORKER_GROUP,
+ "warningType": configuration.WORKFLOW_WARNING_TYPE,
+ "warningGroupId": 0,
"timeout": 0,
- "releaseState": ProcessDefinitionReleaseState.ONLINE,
+ "releaseState": 1,
"param": None,
"tasks": {},
"taskDefinitionJson": [{}],
"taskRelationJson": [{}],
+ "resourceList": [],
}
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
assert pd.get_define() == expect
@@ -311,15 +397,12 @@ def test_process_definition_simple_separate():
"user_attrs",
[
{"tenant": "tenant_specific"},
- {"queue": "queue_specific"},
- {"tenant": "tenant_specific", "queue": "queue_specific"},
],
)
def test_set_process_definition_user_attr(user_attrs):
"""Test user with correct attributes if we specific assigned to process definition object."""
default_value = {
- "tenant": ProcessDefinitionDefault.TENANT,
- "queue": ProcessDefinitionDefault.QUEUE,
+ "tenant": configuration.WORKFLOW_TENANT,
}
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, **user_attrs) as pd:
user = pd.user
@@ -407,13 +490,13 @@ def test_schedule_json_start_and_end_time(start_time, end_time, expect_date):
"crontab": schedule,
"startTime": expect_date["start_time"],
"endTime": expect_date["end_time"],
- "timezoneId": ProcessDefinitionDefault.TIME_ZONE,
+ "timezoneId": configuration.WORKFLOW_TIME_ZONE,
}
with ProcessDefinition(
TEST_PROCESS_DEFINITION_NAME,
schedule=schedule,
start_time=start_time,
end_time=end_time,
- timezone=ProcessDefinitionDefault.TIME_ZONE,
+ timezone=configuration.WORKFLOW_TIME_ZONE,
) as pd:
assert pd.schedule_json == expect
diff --git a/tests/testing/task.py b/tests/core/test_resource_definition.py
similarity index 59%
copy from tests/testing/task.py
copy to tests/core/test_resource_definition.py
index e0affc9..ebfb893 100644
--- a/tests/testing/task.py
+++ b/tests/core/test_resource_definition.py
@@ -15,18 +15,24 @@
# specific language governing permissions and limitations
# under the License.
-"""Mock class Task for other test."""
+"""Test resource definition."""
-import uuid
+from pydolphinscheduler.core.resource import Resource
-from pydolphinscheduler.core.task import Task as SourceTask
-
-class Task(SourceTask):
- """Mock class :class:`pydolphinscheduler.core.task.Task` for unittest."""
-
- DEFAULT_VERSION = 1
-
- def gen_code_and_version(self):
- """Mock java gateway code and version, convenience method for unittest."""
- return uuid.uuid1().time, self.DEFAULT_VERSION
+def test_resource():
+ """Test resource set attributes which get with same type."""
+ name = "/dev/test.py"
+ content = """print("hello world")"""
+ description = "hello world"
+ expect = {
+ "name": name,
+ "content": content,
+ "description": description,
+ }
+ resourceDefinition = Resource(
+ name=name,
+ content=content,
+ description=description,
+ )
+ assert resourceDefinition.get_define() == expect
diff --git a/tests/core/test_task.py b/tests/core/test_task.py
index 6af731b..65555c1 100644
--- a/tests/core/test_task.py
+++ b/tests/core/test_task.py
@@ -16,13 +16,16 @@
# under the License.
"""Test Task class function."""
-
+import logging
+import re
from unittest.mock import patch
import pytest
+from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.core.task import Task, TaskRelation
from tests.testing.task import Task as testTask
+from tests.testing.task import TaskWithCode
TEST_TASK_RELATION_SET = set()
TEST_TASK_RELATION_SIZE = 0
@@ -51,7 +54,7 @@ TEST_TASK_RELATION_SIZE = 0
},
{
"localParams": ["foo", "bar"],
- "resourceList": ["foo", "bar"],
+ "resourceList": [{"id": 1}],
"dependence": {"foo", "bar"},
"waitStartTimeout": {"foo", "bar"},
"conditionResult": {"foo": ["bar"]},
@@ -59,7 +62,11 @@ TEST_TASK_RELATION_SIZE = 0
),
],
)
-def test_property_task_params(attr, expect):
+@patch(
+ "pydolphinscheduler.core.task.Task.query_resource",
+ return_value=({"id": 1, "name": "foo"}),
+)
+def test_property_task_params(mock_resource, attr, expect):
"""Test class task property."""
task = testTask(
"test-property-task-params",
@@ -222,3 +229,50 @@ def test_tasks_list_shift(dep_expr: str, flag: str):
assert all([1 == len(getattr(t, reverse_direction_attr)) for t in tasks])
assert all([task.code in getattr(t, reverse_direction_attr) for t in tasks])
+
+
+def test_add_duplicate(caplog):
+ """Test add task which code already in process definition."""
+ with ProcessDefinition("test_add_duplicate_workflow") as _:
+ TaskWithCode(name="test_task_1", task_type="test", code=123, version=1)
+ with caplog.at_level(logging.WARNING):
+ TaskWithCode(
+ name="test_task_duplicate_code", task_type="test", code=123, version=2
+ )
+ assert all(
+ [
+ caplog.text.startswith("WARNING pydolphinscheduler"),
+ re.findall("already in process definition", caplog.text),
+ ]
+ )
+
+
+@pytest.mark.parametrize(
+ "resources, expect",
+ [
+ (
+ ["/dev/test.py"],
+ [{"id": 1}],
+ ),
+ (
+ ["/dev/test.py", {"id": 2}],
+ [{"id": 1}, {"id": 2}],
+ ),
+ ],
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.query_resource",
+ return_value=({"id": 1, "name": "/dev/test.py"}),
+)
+def test_python_resource_list(mock_code_version, mock_resource, resources, expect):
+ """Test python task resource list."""
+ task = Task(
+ name="python_resource_list.",
+ task_type="PYTHON",
+ resource_list=resources,
+ )
+ assert task.resource_list == expect
diff --git a/tests/example/test_example.py b/tests/example/test_example.py
index 5bf897f..70f3677 100644
--- a/tests/example/test_example.py
+++ b/tests/example/test_example.py
@@ -44,7 +44,7 @@ def test_task_without_example():
Avoiding add new type of tasks but without adding example describe how to use it.
"""
# We use example/tutorial.py as shell task example
- ignore_name = {"__init__.py", "shell.py"}
+ ignore_name = {"__init__.py", "shell.py", "func_wrap.py"}
all_tasks = {task.stem for task in get_tasks(ignore_name=ignore_name)}
have_example_tasks = set()
@@ -97,7 +97,7 @@ def test_example_basic():
), f"We expect all examples is python script, but get {ex.name}."
# All except tutorial and __init__ is end with keyword "_example"
- if ex.stem != "tutorial" and ex.stem != "__init__":
+ if ex.stem not in ("tutorial", "tutorial_decorator") and ex.stem != "__init__":
assert ex.stem.endswith(
"_example"
), f"We expect all examples script end with keyword '_example', but get {ex.stem}."
diff --git a/src/pydolphinscheduler/core/__init__.py b/tests/integration/__init__.py
similarity index 73%
copy from src/pydolphinscheduler/core/__init__.py
copy to tests/integration/__init__.py
index 31dc944..65625a9 100644
--- a/src/pydolphinscheduler/core/__init__.py
+++ b/tests/integration/__init__.py
@@ -15,14 +15,4 @@
# specific language governing permissions and limitations
# under the License.
-"""Init pydolphinscheduler.core package."""
-
-from pydolphinscheduler.core.database import Database
-from pydolphinscheduler.core.process_definition import ProcessDefinition
-from pydolphinscheduler.core.task import Task
-
-__all__ = [
- "ProcessDefinition",
- "Task",
- "Database",
-]
+"""Test integration between Python API and PythonGatewayService."""
diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py
new file mode 100644
index 0000000..a9cd352
--- /dev/null
+++ b/tests/integration/conftest.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""py.test conftest.py file for package integration test."""
+
+import pytest
+
+from tests.testing.docker_wrapper import DockerWrapper
+
+
+@pytest.fixture(scope="package", autouse=True)
+def docker_setup_teardown():
+ """Fixture for whole package tests, Set up and teardown docker env.
+
+ Fixture in file named ``conftest.py`` with ``scope=package`` could be auto import in the
+ whole package, and with attribute ``autouse=True`` will be auto-use for each test cases.
+
+ .. seealso::
+ For more information about conftest.py see:
+ https://docs.pytest.org/en/latest/example/simple.html#package-directory-level-fixtures-setups
+ """
+ docker_wrapper = DockerWrapper(
+ image="apache/dolphinscheduler-standalone-server:ci",
+ container_name="ci-dolphinscheduler-standalone-server",
+ )
+ ports = {"25333/tcp": 25333}
+ container = docker_wrapper.run_until_log(
+ log="Started StandaloneServer in", tty=True, ports=ports
+ )
+ assert container is not None
+ yield
+ docker_wrapper.remove_container()
diff --git a/tests/integration/test_java_gateway.py b/tests/integration/test_java_gateway.py
new file mode 100644
index 0000000..8b7c5ff
--- /dev/null
+++ b/tests/integration/test_java_gateway.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test pydolphinscheduler java gateway."""
+
+
+from py4j.java_gateway import JavaGateway, java_import
+
+
+def test_gateway_connect():
+ """Test weather client could connect java gate way or not."""
+ gateway = JavaGateway()
+ app = gateway.entry_point
+ assert app.ping() == "PONG"
+
+
+def test_jvm_simple():
+ """Test use JVM build-in object and operator from java gateway."""
+ gateway = JavaGateway()
+ smallest = gateway.jvm.java.lang.Integer.MIN_VALUE
+ biggest = gateway.jvm.java.lang.Integer.MAX_VALUE
+ assert smallest is not None and biggest is not None
+ assert biggest > smallest
+
+
+def test_python_client_java_import_single():
+ """Test import single class from java gateway."""
+ gateway = JavaGateway()
+ java_import(gateway.jvm, "org.apache.dolphinscheduler.common.utils.FileUtils")
+ assert hasattr(gateway.jvm, "FileUtils")
+
+
+def test_python_client_java_import_package():
+ """Test import package contain multiple class from java gateway."""
+ gateway = JavaGateway()
+ java_import(gateway.jvm, "org.apache.dolphinscheduler.common.utils.*")
+ # test if jvm view have some common utils
+ for util in ("FileUtils", "OSUtils", "DateUtils"):
+ assert hasattr(gateway.jvm, util)
diff --git a/src/pydolphinscheduler/core/__init__.py b/tests/integration/test_process_definition.py
similarity index 50%
copy from src/pydolphinscheduler/core/__init__.py
copy to tests/integration/test_process_definition.py
index 31dc944..1672bde 100644
--- a/src/pydolphinscheduler/core/__init__.py
+++ b/tests/integration/test_process_definition.py
@@ -15,14 +15,36 @@
# specific language governing permissions and limitations
# under the License.
-"""Init pydolphinscheduler.core package."""
+"""Test process definition in integration."""
+
+from typing import Dict
+
+import pytest
-from pydolphinscheduler.core.database import Database
from pydolphinscheduler.core.process_definition import ProcessDefinition
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.tasks.shell import Shell
+
+PROCESS_DEFINITION_NAME = "test_change_exists_attr_pd"
+TASK_NAME = f"task_{PROCESS_DEFINITION_NAME}"
+
-__all__ = [
- "ProcessDefinition",
- "Task",
- "Database",
-]
+@pytest.mark.parametrize(
+ "pre, post",
+ [
+ (
+ {
+ "user": "pre_user",
+ },
+ {
+ "user": "post_user",
+ },
+ )
+ ],
+)
+def test_change_process_definition_attr(pre: Dict, post: Dict):
+ """Test whether process definition success when specific attribute change."""
+ assert pre.keys() == post.keys(), "Not equal keys for pre and post attribute."
+ for attrs in [pre, post]:
+ with ProcessDefinition(name=PROCESS_DEFINITION_NAME, **attrs) as pd:
+ Shell(name=TASK_NAME, command="echo 1")
+ pd.submit()
diff --git a/tests/integration/test_submit_examples.py b/tests/integration/test_submit_examples.py
new file mode 100644
index 0000000..393b0cc
--- /dev/null
+++ b/tests/integration/test_submit_examples.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test whether success submit examples DAG to PythonGatewayService."""
+
+import subprocess
+from pathlib import Path
+
+import pytest
+
+from tests.testing.constants import ignore_exec_examples
+from tests.testing.path import path_example
+
+
+@pytest.mark.parametrize(
+ "example_path",
+ [
+ path
+ for path in path_example.iterdir()
+ if path.is_file() and path.stem not in ignore_exec_examples
+ ],
+)
+def test_exec_white_list_example(example_path: Path):
+ """Test execute examples and submit DAG to PythonGatewayService."""
+ try:
+ # Because our task decorator used module ``inspect`` to get the source, and it will
+ # raise IOError when call it by built-in function ``exec``, so we change to ``subprocess.check_call``
+ subprocess.check_call(["python", str(example_path)])
+ except subprocess.CalledProcessError:
+ raise RuntimeError("Run example %s failed.", example_path.stem)
+
+
+def test_exec_multiple_times():
+ """Test whether process definition can be executed more than one times."""
+ tutorial_path = path_example.joinpath("tutorial.py")
+ time = 0
+ while time < 3:
+ try:
+ subprocess.check_call(["python", str(tutorial_path)])
+ except subprocess.CalledProcessError:
+ raise RuntimeError("Run example %s failed.", tutorial_path.stem)
+ time += 1
diff --git a/tests/tasks/test_func_wrap.py b/tests/tasks/test_func_wrap.py
new file mode 100644
index 0000000..628b6e7
--- /dev/null
+++ b/tests/tasks/test_func_wrap.py
@@ -0,0 +1,169 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test module about function wrap task decorator."""
+
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.tasks.func_wrap import task
+from tests.testing.decorator import foo as foo_decorator
+from tests.testing.task import Task
+
+PD_NAME = "test_process_definition"
+TASK_NAME = "test_task"
+
+
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
+)
+def test_single_task_outside(mock_code):
+ """Test single decorator task which outside process definition."""
+
+ @task
+ def foo():
+ print(TASK_NAME)
+
+ with ProcessDefinition(PD_NAME) as pd:
+ foo()
+
+ assert pd is not None and pd.name == PD_NAME
+ assert len(pd.tasks) == 1
+
+ pd_task = pd.tasks[12345]
+ assert pd_task.name == "foo"
+ assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()"
+
+
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
+)
+def test_single_task_inside(mock_code):
+ """Test single decorator task which inside process definition."""
+ with ProcessDefinition(PD_NAME) as pd:
+
+ @task
+ def foo():
+ print(TASK_NAME)
+
+ foo()
+
+ assert pd is not None and pd.name == PD_NAME
+ assert len(pd.tasks) == 1
+
+ pd_task = pd.tasks[12345]
+ assert pd_task.name == "foo"
+ assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()"
+
+
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
+)
+def test_addition_decorator_error(mock_code):
+ """Test error when using task decorator to a function already have decorator."""
+
+ @task
+ @foo_decorator
+ def foo():
+ print(TASK_NAME)
+
+ with ProcessDefinition(PD_NAME) as pd: # noqa: F841
+ with pytest.raises(
+ PyDSParamException, match="Do no support other decorators for.*"
+ ):
+ foo()
+
+
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
+)
+def test_multiple_tasks_outside(mock_code):
+ """Test multiple decorator tasks which outside process definition."""
+
+ @task
+ def foo():
+ print(TASK_NAME)
+
+ @task
+ def bar():
+ print(TASK_NAME)
+
+ with ProcessDefinition(PD_NAME) as pd:
+ foo = foo()
+ bar = bar()
+
+ foo >> bar
+
+ assert pd is not None and pd.name == PD_NAME
+ assert len(pd.tasks) == 2
+
+ task_foo = pd.get_one_task_by_name("foo")
+ task_bar = pd.get_one_task_by_name("bar")
+ assert set(pd.task_list) == {task_foo, task_bar}
+ assert (
+ task_foo is not None
+ and task_foo._upstream_task_codes == set()
+ and task_foo._downstream_task_codes.pop() == task_bar.code
+ )
+ assert (
+ task_bar is not None
+ and task_bar._upstream_task_codes.pop() == task_foo.code
+ and task_bar._downstream_task_codes == set()
+ )
+
+
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
+)
+def test_multiple_tasks_inside(mock_code):
+ """Test multiple decorator tasks which inside process definition."""
+ with ProcessDefinition(PD_NAME) as pd:
+
+ @task
+ def foo():
+ print(TASK_NAME)
+
+ @task
+ def bar():
+ print(TASK_NAME)
+
+ foo = foo()
+ bar = bar()
+
+ foo >> bar
+
+ assert pd is not None and pd.name == PD_NAME
+ assert len(pd.tasks) == 2
+
+ task_foo = pd.get_one_task_by_name("foo")
+ task_bar = pd.get_one_task_by_name("bar")
+ assert set(pd.task_list) == {task_foo, task_bar}
+ assert (
+ task_foo is not None
+ and task_foo._upstream_task_codes == set()
+ and task_foo._downstream_task_codes.pop() == task_bar.code
+ )
+ assert (
+ task_bar is not None
+ and task_bar._upstream_task_codes.pop() == task_foo.code
+ and task_bar._downstream_task_codes == set()
+ )
diff --git a/tests/tasks/test_python.py b/tests/tasks/test_python.py
index dbcd298..1cdd85d 100644
--- a/tests/tasks/test_python.py
+++ b/tests/tasks/test_python.py
@@ -26,11 +26,15 @@ from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.tasks.python import Python
+def foo(): # noqa: D103
+ print("hello world.")
+
+
@pytest.mark.parametrize(
"attr, expect",
[
(
- {"code": "print(1)"},
+ {"definition": "print(1)"},
{
"rawScript": "print(1)",
"localParams": [],
@@ -39,7 +43,29 @@ from pydolphinscheduler.tasks.python import Python
"waitStartTimeout": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
},
- )
+ ),
+ (
+ {"definition": "def foo():\n print('I am foo')"},
+ {
+ "rawScript": "def foo():\n print('I am foo')\nfoo()",
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "waitStartTimeout": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ },
+ ),
+ (
+ {"definition": foo},
+ {
+ "rawScript": 'def foo(): # noqa: D103\n print("hello world.")\nfoo()',
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "waitStartTimeout": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ },
+ ),
],
)
@patch(
@@ -66,15 +92,13 @@ def test_property_task_params(mock_code_version, attr, expect):
def test_python_task_not_support_code(mock_code, script_code):
"""Test python task parameters."""
name = "not_support_code_type"
- with pytest.raises(PyDSParamException, match="Parameter code do not support .*?"):
+ with pytest.raises(
+ PyDSParamException, match="Parameter definition do not support .*?"
+ ):
task = Python(name, script_code)
task.raw_script
-def foo(): # noqa: D103
- print("hello world.")
-
-
@pytest.mark.parametrize(
"name, script_code, raw",
[
@@ -82,7 +106,7 @@ def foo(): # noqa: D103
(
"function_define",
foo,
- 'def foo(): # noqa: D103\n print("hello world.")\n',
+ 'def foo(): # noqa: D103\n print("hello world.")\nfoo()',
),
],
)
diff --git a/tests/tasks/test_sql.py b/tests/tasks/test_sql.py
index 3f8209c..ee0acc4 100644
--- a/tests/tasks/test_sql.py
+++ b/tests/tasks/test_sql.py
@@ -26,24 +26,38 @@ from pydolphinscheduler.tasks.sql import Sql, SqlType
@pytest.mark.parametrize(
- "sql, sql_type",
+ "sql, param_sql_type, sql_type",
[
- ("select 1", SqlType.SELECT),
- (" select 1", SqlType.SELECT),
- (" select 1 ", SqlType.SELECT),
- (" select 'insert' ", SqlType.SELECT),
- (" select 'insert ' ", SqlType.SELECT),
- ("with tmp as (select 1) select * from tmp ", SqlType.SELECT),
- ("insert into table_name(col1, col2) value (val1, val2)", SqlType.NOT_SELECT),
+ ("select 1", None, SqlType.SELECT),
+ (" select 1", None, SqlType.SELECT),
+ (" select 1 ", None, SqlType.SELECT),
+ (" select 'insert' ", None, SqlType.SELECT),
+ (" select 'insert ' ", None, SqlType.SELECT),
+ ("with tmp as (select 1) select * from tmp ", None, SqlType.SELECT),
+ (
+ "insert into table_name(col1, col2) value (val1, val2)",
+ None,
+ SqlType.NOT_SELECT,
+ ),
(
"insert into table_name(select, col2) value ('select', val2)",
+ None,
+ SqlType.NOT_SELECT,
+ ),
+ ("update table_name SET col1=val1 where col1=val2", None, SqlType.NOT_SELECT),
+ (
+ "update table_name SET col1='select' where col1=val2",
+ None,
SqlType.NOT_SELECT,
),
- ("update table_name SET col1=val1 where col1=val2", SqlType.NOT_SELECT),
- ("update table_name SET col1='select' where col1=val2", SqlType.NOT_SELECT),
- ("delete from table_name where id < 10", SqlType.NOT_SELECT),
- ("delete from table_name where id < 10", SqlType.NOT_SELECT),
- ("alter table table_name add column col1 int", SqlType.NOT_SELECT),
+ ("delete from table_name where id < 10", None, SqlType.NOT_SELECT),
+ ("delete from table_name where id < 10", None, SqlType.NOT_SELECT),
+ ("alter table table_name add column col1 int", None, SqlType.NOT_SELECT),
+ ("create table table_name2 (col1 int)", None, SqlType.NOT_SELECT),
+ ("create table table_name2 (col1 int)", SqlType.SELECT, SqlType.SELECT),
+ ("select 1", SqlType.NOT_SELECT, SqlType.NOT_SELECT),
+ ("create table table_name2 (col1 int)", SqlType.NOT_SELECT, SqlType.NOT_SELECT),
+ ("select 1", SqlType.SELECT, SqlType.SELECT),
],
)
@patch(
@@ -54,11 +68,13 @@ from pydolphinscheduler.tasks.sql import Sql, SqlType
"pydolphinscheduler.core.database.Database.get_database_info",
return_value=({"id": 1, "type": "mock_type"}),
)
-def test_get_sql_type(mock_datasource, mock_code_version, sql, sql_type):
+def test_get_sql_type(
+ mock_datasource, mock_code_version, sql, param_sql_type, sql_type
+):
"""Test property sql_type could return correct type."""
name = "test_get_sql_type"
datasource_name = "test_datasource"
- task = Sql(name, datasource_name, sql)
+ task = Sql(name, datasource_name, sql, sql_type=param_sql_type)
assert (
sql_type == task.sql_type
), f"Sql {sql} expect sql type is {sql_type} but got {task.sql_type}"
@@ -73,7 +89,7 @@ def test_get_sql_type(mock_datasource, mock_code_version, sql, sql_type):
"sql": "select 1",
"type": "MYSQL",
"datasource": 1,
- "sqlType": SqlType.SELECT,
+ "sqlType": "0",
"preStatements": [],
"postStatements": [],
"displayRows": 10,
@@ -122,7 +138,7 @@ def test_sql_get_define(mock_datasource):
"type": "MYSQL",
"datasource": 1,
"sql": command,
- "sqlType": SqlType.SELECT,
+ "sqlType": "0",
"displayRows": 10,
"preStatements": [],
"postStatements": [],
diff --git a/tests/testing/cli.py b/tests/testing/cli.py
new file mode 100644
index 0000000..0d2c1d1
--- /dev/null
+++ b/tests/testing/cli.py
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Utils of command line test."""
+
+
+from click.testing import CliRunner
+
+from tests.testing.constants import DEV_MODE
+
+
+class CliTestWrapper:
+ """Wrap command click CliRunner.invoke."""
+
+ def __init__(self, *args, **kwargs):
+ runner = CliRunner()
+ self.result = runner.invoke(*args, **kwargs)
+ self.show_result_output()
+
+ def _assert_output(self, output: str = None, fuzzy: bool = False):
+ """Assert between `CliRunner.invoke.result.output` and parameter `output`.
+
+ :param output: The output will check compare to the ``CliRunner.invoke.output``.
+ :param fuzzy: A flag define whether assert :param:`output` in fuzzy or not.
+ Check if `CliRunner.invoke.output` contain :param:`output` is set ``True``
+ and CliRunner.invoke.output equal to :param:`output` if we set it ``False``.
+ """
+ if not output:
+ return
+ if fuzzy:
+ assert output in self.result.output
+ else:
+ assert self.result.output.rstrip("\n") == output
+
+ def show_result_output(self):
+ """Print `CliRunner.invoke.result` output content in debug mode.
+
+ It read variable named `PY_DOLPHINSCHEDULER_DEV_MODE` from env, when it set to `true` or `t` or `1`
+ will print result output when class :class:`CliTestWrapper` is initialization.
+ """
+ if DEV_MODE:
+ print(f"\n{self.result.output}\n")
+
+ def assert_success(self, output: str = None, fuzzy: bool = False):
+ """Assert test is success.
+
+ It would check whether `CliRunner.invoke.exit_code` equals to `0`, with no
+ exception at the same time. It's also can test the content of `CliRunner.invoke.output`.
+
+ :param output: The output will check compare to the ``CliRunner.invoke.output``.
+ :param fuzzy: A flag define whether assert :param:`output` in fuzzy or not.
+ Check if `CliRunner.invoke.output` contain :param:`output` is set ``True``
+ and CliRunner.invoke.output equal to :param:`output` if we set it ``False``.
+ """
+ assert self.result.exit_code == 0
+ if self.result.exception:
+ raise self.result.exception
+ self._assert_output(output, fuzzy)
+
+ def assert_fail(self, ret_code: int, output: str = None, fuzzy: bool = False):
+ """Assert test is fail.
+
+ It would check whether `CliRunner.invoke.exit_code` equals to :param:`ret_code`,
+ and it will also can test the content of `CliRunner.invoke.output`.
+
+ :param ret_code: The returning code of this fail test.
+ :param output: The output will check compare to the ``CliRunner.invoke.output``.
+ :param fuzzy: A flag define whether assert :param:`output` in fuzzy or not.
+ Check if `CliRunner.invoke.output` contain :param:`output` is set ``True``
+ and CliRunner.invoke.output equal to :param:`output` if we set it ``False``.
+ """
+ assert ret_code == self.result.exit_code
+ self._assert_output(output, fuzzy)
diff --git a/tests/testing/constants.py b/tests/testing/constants.py
index 1552192..ed2ee37 100644
--- a/tests/testing/constants.py
+++ b/tests/testing/constants.py
@@ -17,6 +17,8 @@
"""Constants variables for test module."""
+import os
+
# Record some task without example in directory `example`. Some of them maybe can not write example,
# but most of them just without adding by mistake, and we should add it later.
task_without_example = {
@@ -26,3 +28,21 @@ task_without_example = {
"python",
"procedure",
}
+
+# The examples ignore test to run it. Those examples could not be run directly cause it need other
+# support like resource files, data source and etc. But we should try to run them later for more coverage
+ignore_exec_examples = {
+ "task_datax_example",
+ "task_flink_example",
+ "task_map_reduce_example",
+ "task_spark_example",
+}
+
+# pydolphinscheduler environment home
+ENV_PYDS_HOME = "PYDS_HOME"
+
+# whether in dev mode, if true we will add or remove some tests. Or make be and more detail infos when
+# test failed.
+DEV_MODE = str(
+ os.environ.get("PY_DOLPHINSCHEDULER_DEV_MODE", False)
+).strip().lower() in {"true", "t", "1"}
diff --git a/src/pydolphinscheduler/core/__init__.py b/tests/testing/decorator.py
similarity index 73%
copy from src/pydolphinscheduler/core/__init__.py
copy to tests/testing/decorator.py
index 31dc944..78078ee 100644
--- a/src/pydolphinscheduler/core/__init__.py
+++ b/tests/testing/decorator.py
@@ -15,14 +15,18 @@
# specific language governing permissions and limitations
# under the License.
-"""Init pydolphinscheduler.core package."""
+"""Decorator module for testing module."""
-from pydolphinscheduler.core.database import Database
-from pydolphinscheduler.core.process_definition import ProcessDefinition
-from pydolphinscheduler.core.task import Task
+import types
+from functools import wraps
-__all__ = [
- "ProcessDefinition",
- "Task",
- "Database",
-]
+
+def foo(func: types.FunctionType):
+ """Decorate which do nothing for testing module."""
+
+ @wraps(func)
+ def wrapper():
+ print("foo decorator called.")
+ func()
+
+ return wrapper
diff --git a/tests/testing/docker_wrapper.py b/tests/testing/docker_wrapper.py
new file mode 100644
index 0000000..a3d0b6e
--- /dev/null
+++ b/tests/testing/docker_wrapper.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Wrap docker commands for easier create docker container."""
+
+import time
+from typing import Optional
+
+import docker
+from docker.errors import ImageNotFound
+from docker.models.containers import Container
+
+
+class DockerWrapper:
+ """Wrap docker commands for easier create docker container.
+
+ :param image: The image to create docker container.
+ """
+
+ def __init__(self, image: str, container_name: str):
+ self._client = docker.from_env()
+ self.image = image
+ self.container_name = container_name
+
+ def run(self, *args, **kwargs) -> Container:
+ """Create and run a new container.
+
+ This method would return immediately after the container started, if you wish it return container
+ object when specific service start, you could see :func:`run_until_log` which return container
+ object when specific output log appear in docker.
+ """
+ if not self.images_exists:
+ raise ValueError("Docker image named %s do not exists.", self.image)
+ return self._client.containers.run(
+ image=self.image, name=self.container_name, detach=True, *args, **kwargs
+ )
+
+ def run_until_log(
+ self, log: str, remove_exists: Optional[bool] = True, *args, **kwargs
+ ) -> Container:
+ """Create and run a new container, return when specific log appear.
+
+ It will call :func:`run` inside this method. And after container started, it would not
+ return it immediately but run command `docker logs` to see whether specific log appear.
+ It will raise `RuntimeError` when 10 minutes after but specific log do not appear.
+ """
+ if remove_exists:
+ self.remove_container()
+
+ log_byte = str.encode(log)
+ container = self.run(*args, **kwargs)
+
+ timeout_threshold = 10 * 60
+ start_time = time.time()
+ while time.time() <= start_time + timeout_threshold:
+ if log_byte in container.logs(tail=1000):
+ break
+ time.sleep(2)
+ # Stop container and raise error when reach timeout threshold but do not appear specific log output
+ else:
+ container.remove(force=True)
+ raise RuntimeError(
+ "Can not capture specific log `%s` in %d seconds, remove container.",
+ (log, timeout_threshold),
+ )
+ return container
+
+ def remove_container(self):
+ """Remove container which already running."""
+ containers = self._client.containers.list(
+ all=True, filters={"name": self.container_name}
+ )
+ if containers:
+ for container in containers:
+ container.remove(force=True)
+
+ @property
+ def images_exists(self) -> bool:
+ """Check whether the image exists in local docker repository or not."""
+ try:
+ self._client.images.get(self.image)
+ return True
+ except ImageNotFound:
+ return False
diff --git a/tests/testing/task.py b/tests/testing/file.py
similarity index 63%
copy from tests/testing/task.py
copy to tests/testing/file.py
index e0affc9..82e0837 100644
--- a/tests/testing/task.py
+++ b/tests/testing/file.py
@@ -15,18 +15,20 @@
# specific language governing permissions and limitations
# under the License.
-"""Mock class Task for other test."""
+"""Testing util about file operating."""
-import uuid
+from pathlib import Path
+from typing import Union
-from pydolphinscheduler.core.task import Task as SourceTask
+def get_file_content(path: Union[str, Path]) -> str:
+ """Get file content in given path."""
+ with open(path, mode="r") as f:
+ return f.read()
-class Task(SourceTask):
- """Mock class :class:`pydolphinscheduler.core.task.Task` for unittest."""
- DEFAULT_VERSION = 1
-
- def gen_code_and_version(self):
- """Mock java gateway code and version, convenience method for unittest."""
- return uuid.uuid1().time, self.DEFAULT_VERSION
+def delete_file(path: Union[str, Path]) -> None:
+ """Delete file in given path."""
+ path = Path(path).expanduser() if isinstance(path, str) else path.expanduser()
+ if path.exists():
+ path.unlink()
diff --git a/tests/testing/path.py b/tests/testing/path.py
index 2e75be2..d1e520b 100644
--- a/tests/testing/path.py
+++ b/tests/testing/path.py
@@ -20,13 +20,14 @@
from pathlib import Path
from typing import Any, Generator
-path_code_tasks = Path(__file__).parent.parent.parent.joinpath(
- "src", "pydolphinscheduler", "tasks"
-)
-path_example = Path(__file__).parent.parent.parent.joinpath(
- "src", "pydolphinscheduler", "examples"
+project_root = Path(__file__).parent.parent.parent
+
+path_code_tasks = project_root.joinpath("src", "pydolphinscheduler", "tasks")
+path_example = project_root.joinpath("src", "pydolphinscheduler", "examples")
+path_doc_tasks = project_root.joinpath("docs", "source", "tasks")
+path_default_config_yaml = project_root.joinpath(
+ "src", "pydolphinscheduler", "core", "default_config.yaml"
)
-path_doc_tasks = Path(__file__).parent.parent.parent.joinpath("docs", "source", "tasks")
def get_all_examples() -> Generator[Path, Any, None]:
diff --git a/tests/testing/task.py b/tests/testing/task.py
index e0affc9..11ffbf1 100644
--- a/tests/testing/task.py
+++ b/tests/testing/task.py
@@ -30,3 +30,18 @@ class Task(SourceTask):
def gen_code_and_version(self):
"""Mock java gateway code and version, convenience method for unittest."""
return uuid.uuid1().time, self.DEFAULT_VERSION
+
+
+class TaskWithCode(SourceTask):
+ """Mock class :class:`pydolphinscheduler.core.task.Task` and it return some code and version."""
+
+ def __init__(
+ self, name: str, task_type: str, code: int, version: int, *args, **kwargs
+ ):
+ self._constant_code = code
+ self._constant_version = version
+ super().__init__(name, task_type, *args, **kwargs)
+
+ def gen_code_and_version(self):
+ """Mock java gateway code and version, convenience method for unittest."""
+ return self._constant_code, self._constant_version
diff --git a/tests/utils/test_file.py b/tests/utils/test_file.py
new file mode 100644
index 0000000..4cc6df4
--- /dev/null
+++ b/tests/utils/test_file.py
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test file utils."""
+
+import shutil
+from pathlib import Path
+
+import pytest
+
+from pydolphinscheduler.utils import file
+from tests.testing.file import delete_file, get_file_content
+
+content = "test_content"
+file_path = "/tmp/test/file/test_file_write.txt"
+
+
+@pytest.fixture
+def teardown_del_file():
+ """Teardown about delete file."""
+ yield
+ delete_file(file_path)
+
+
+@pytest.fixture
+def setup_crt_first():
+ """Set up and teardown about create file first and then delete it."""
+ file.write(content=content, to_path=file_path)
+ yield
+ delete_file(file_path)
+
+
+def test_write_content(teardown_del_file):
+ """Test function :func:`write` on write behavior with correct content."""
+ assert not Path(file_path).exists()
+ file.write(content=content, to_path=file_path)
+ assert Path(file_path).exists()
+ assert content == get_file_content(file_path)
+
+
+def test_write_not_create_parent(teardown_del_file):
+ """Test function :func:`write` with parent not exists and do not create path."""
+ file_test_dir = Path(file_path).parent
+ if file_test_dir.exists():
+ shutil.rmtree(str(file_test_dir))
+ assert not file_test_dir.exists()
+ with pytest.raises(
+ ValueError,
+ match="Parent directory do not exists and set param `create` to `False`",
+ ):
+ file.write(content=content, to_path=file_path, create=False)
+
+
+def test_write_overwrite(setup_crt_first):
+ """Test success with file exists but set ``True`` to overwrite."""
+ assert Path(file_path).exists()
+
+ new_content = f"new_{content}"
+ file.write(content=new_content, to_path=file_path, overwrite=True)
+ assert new_content == get_file_content(file_path)
+
+
+def test_write_overwrite_error(setup_crt_first):
+ """Test error with file exists but set ``False`` to overwrite."""
+ assert Path(file_path).exists()
+
+ new_content = f"new_{content}"
+ with pytest.raises(
+ FileExistsError, match=".*already exists and you choose not overwrite mode\\."
+ ):
+ file.write(content=new_content, to_path=file_path)
diff --git a/tests/utils/test_yaml_parser.py b/tests/utils/test_yaml_parser.py
new file mode 100644
index 0000000..ad3aaf7
--- /dev/null
+++ b/tests/utils/test_yaml_parser.py
@@ -0,0 +1,255 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test utils.path_dict module."""
+
+from typing import Dict
+
+import pytest
+from ruamel.yaml import YAML
+
+from pydolphinscheduler.utils.yaml_parser import YamlParser
+from tests.testing.path import path_default_config_yaml
+
+yaml = YAML()
+
+expects = [
+ {
+ # yaml.load("no need test") is a flag about skipping it because it to different to maintainer
+ "name": yaml.load("no need test"),
+ "name.family": ("Smith", "SmithEdit"),
+ "name.given": ("Alice", "AliceEdit"),
+ "name.mark": yaml.load("no need test"),
+ "name.mark.name_mark": yaml.load("no need test"),
+ "name.mark.name_mark.key": ("value", "valueEdit"),
+ },
+ {
+ # yaml.load("no need test") is a flag about skipping it because it to different to maintainer
+ "java_gateway": yaml.load("no need test"),
+ "java_gateway.address": ("127.0.0.1", "127.1.1.1"),
+ "java_gateway.port": (25333, 25555),
+ "java_gateway.auto_convert": (True, False),
+ "default": yaml.load("no need test"),
+ "default.user": yaml.load("no need test"),
+ "default.user.name": ("userPythonGateway", "userPythonGatewayEdit"),
+ "default.user.password": ("userPythonGateway", "userPythonGatewayEdit"),
+ "default.user.email": (
+ "userPythonGateway@dolphinscheduler.com",
+ "userEdit@dolphinscheduler.com",
+ ),
+ "default.user.tenant": ("tenant_pydolphin", "tenant_pydolphinEdit"),
+ "default.user.phone": (11111111111, 22222222222),
+ "default.user.state": (1, 0),
+ "default.workflow": yaml.load("no need test"),
+ "default.workflow.project": ("project-pydolphin", "project-pydolphinEdit"),
+ "default.workflow.tenant": ("tenant_pydolphin", "SmithEdit"),
+ "default.workflow.user": ("userPythonGateway", "SmithEdit"),
+ "default.workflow.queue": ("queuePythonGateway", "queueEdit"),
+ "default.workflow.worker_group": ("default", "wgEdit"),
+ "default.workflow.release_state": ("online", "offline"),
+ "default.workflow.time_zone": ("Asia/Shanghai", "Europe/Amsterdam"),
+ "default.workflow.warning_type": ("NONE", "SUCCESS"),
+ },
+]
+
+param = [
+ """#example
+name:
+ # details
+ family: Smith # very common
+ given: Alice # one of the siblings
+ mark:
+ name_mark:
+ key: value
+"""
+]
+
+with open(path_default_config_yaml, "r") as f:
+ param.append(f.read())
+
+
+@pytest.mark.parametrize(
+ "src, delimiter, expect",
+ [
+ (
+ param[0],
+ "|",
+ expects[0],
+ ),
+ (
+ param[1],
+ "/",
+ expects[1],
+ ),
+ ],
+)
+def test_yaml_parser_specific_delimiter(src: str, delimiter: str, expect: Dict):
+ """Test specific delimiter for :class:`YamlParser`."""
+
+ def ch_dl(key):
+ return key.replace(".", delimiter)
+
+ yaml_parser = YamlParser(src, delimiter=delimiter)
+ assert all(
+ [
+ expect[key][0] == yaml_parser[ch_dl(key)]
+ for key in expect
+ if expect[key] != "no need test"
+ ]
+ )
+ assert all(
+ [
+ expect[key][0] == yaml_parser.get(ch_dl(key))
+ for key in expect
+ if expect[key] != "no need test"
+ ]
+ )
+
+
+@pytest.mark.parametrize(
+ "src, expect",
+ [
+ (
+ param[0],
+ expects[0],
+ ),
+ (
+ param[1],
+ expects[1],
+ ),
+ ],
+)
+def test_yaml_parser_contains(src: str, expect: Dict):
+ """Test magic function :func:`YamlParser.__contain__` also with `key in obj` syntax."""
+ yaml_parser = YamlParser(src)
+ assert len(expect.keys()) == len(
+ yaml_parser.dict_parser.keys()
+ ), "Parser keys length not equal to expect keys length"
+ assert all(
+ [key in yaml_parser for key in expect]
+ ), "Parser keys not equal to expect keys"
+
+
+@pytest.mark.parametrize(
+ "src, expect",
+ [
+ (
+ param[0],
+ expects[0],
+ ),
+ (
+ param[1],
+ expects[1],
+ ),
+ ],
+)
+def test_yaml_parser_get(src: str, expect: Dict):
+ """Test magic function :func:`YamlParser.__getitem__` also with `obj[key]` syntax."""
+ yaml_parser = YamlParser(src)
+ assert all(
+ [
+ expect[key][0] == yaml_parser[key]
+ for key in expect
+ if expect[key] != "no need test"
+ ]
+ )
+ assert all(
+ [
+ expect[key][0] == yaml_parser.get(key)
+ for key in expect
+ if expect[key] != "no need test"
+ ]
+ )
+
+
+@pytest.mark.parametrize(
+ "src, expect",
+ [
+ (
+ param[0],
+ expects[0],
+ ),
+ (
+ param[1],
+ expects[1],
+ ),
+ ],
+)
+def test_yaml_parser_set(src: str, expect: Dict):
+ """Test magic function :func:`YamlParser.__setitem__` also with `obj[key] = val` syntax."""
+ yaml_parser = YamlParser(src)
+ for key in expect:
+ assert key in yaml_parser.dict_parser.keys()
+ if expect[key] == "no need test":
+ continue
+ assert expect[key][0] == yaml_parser.dict_parser[key]
+ assert expect[key][1] != yaml_parser.dict_parser[key]
+
+ yaml_parser[key] = expect[key][1]
+ assert expect[key][0] != yaml_parser.dict_parser[key]
+ assert expect[key][1] == yaml_parser.dict_parser[key]
+
+
+@pytest.mark.parametrize(
+ "src, setter, expect",
+ [
+ (
+ param[0],
+ {"name.mark.name_mark.key": "edit"},
+ """#example
+name:
+ # details
+ family: Smith # very common
+ given: Alice # one of the siblings
+ mark:
+ name_mark:
+ key: edit
+""",
+ ),
+ (
+ param[0],
+ {
+ "name.family": "SmithEdit",
+ "name.given": "AliceEdit",
+ "name.mark.name_mark.key": "edit",
+ },
+ """#example
+name:
+ # details
+ family: SmithEdit # very common
+ given: AliceEdit # one of the siblings
+ mark:
+ name_mark:
+ key: edit
+""",
+ ),
+ ],
+)
+def test_yaml_parser_str_repr(src: str, setter: Dict, expect: str):
+ """Test function :func:`YamlParser.to_string`."""
+ yaml_parser = YamlParser(src)
+
+ # Equal before change
+ assert f"YamlParser({src})" == repr(yaml_parser)
+ assert src == str(yaml_parser)
+
+ for key, val in setter.items():
+ yaml_parser[key] = val
+
+ # Equal after changed
+ assert expect == str(yaml_parser)
+ assert f"YamlParser({expect})" == repr(yaml_parser)
diff --git a/tox.ini b/tox.ini
new file mode 100644
index 0000000..8e9280f
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[tox]
+envlist = local-ci, auto-lint, lint, doc-build-test, code-test, integrate-test, py{36,37,38,39}
+
+[testenv]
+whitelist_externals = make
+
+[testenv:auto-lint]
+extras = style
+commands =
+ python -m isort .
+ python -m black .
+ python -m autoflake --in-place --remove-all-unused-imports --ignore-init-module-imports --recursive .
+
+[testenv:lint]
+extras = style
+commands =
+ python -m isort --check .
+ python -m black --check .
+ python -m flake8
+ python -m autoflake --remove-all-unused-imports --ignore-init-module-imports --check --recursive .
+
+[testenv:code-test]
+extras = test
+# Run both tests and coverage
+commands =
+ python -m pytest --cov=pydolphinscheduler --cov-config={toxinidir}/.coveragerc tests/
+
+[testenv:doc-build-test]
+extras = doc
+commands =
+ make -C {toxinidir}/docs clean
+ make -C {toxinidir}/docs html
+
+[testenv:integrate-test]
+extras = test
+commands =
+ python -m pytest tests/integration/
+
+[testenv:local-ci]
+extras = dev
+commands =
+ {[testenv:lint]commands}
+ {[testenv:code-test]commands}
+ {[testenv:doc-build-test]commands}