You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2022/01/05 11:57:38 UTC
[dolphinscheduler] branch dev updated: [python] Enhance task datax example (#7801)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new fcbb5f4 [python] Enhance task datax example (#7801)
fcbb5f4 is described below
commit fcbb5f4d8f337236de1aab17a2d1f00b51c2d535
Author: Jiajie Zhong <zh...@hotmail.com>
AuthorDate: Wed Jan 5 19:57:28 2022 +0800
[python] Enhance task datax example (#7801)
* [python] Enhance task datax example
* Add full example for `CustomDataX.json`
* Add comment about datasource need to exists.
close: #7800
* Add missing parameter setting
---
.../examples/task_datax_example.py | 50 ++++++++++++++++++++--
1 file changed, 47 insertions(+), 3 deletions(-)
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py
index 1832921..cdc7f0f 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py
@@ -29,7 +29,48 @@ from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.datax import CustomDataX, DataX
# datax json template
-JSON_TEMPLATE = ""
+JSON_TEMPLATE = {
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "mysqlreader",
+ "parameter": {
+ "username": "usr",
+ "password": "pwd",
+ "column": ["id", "name", "code", "description"],
+ "splitPk": "id",
+ "connection": [
+ {
+ "table": ["source_table"],
+ "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/source_db"],
+ }
+ ],
+ },
+ },
+ "writer": {
+ "name": "mysqlwriter",
+ "parameter": {
+ "writeMode": "insert",
+ "username": "usr",
+ "password": "pwd",
+ "column": ["id", "name"],
+ "connection": [
+ {
+ "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db",
+ "table": ["target_table"],
+ }
+ ],
+ },
+ },
+ }
+ ],
+ "setting": {
+ "errorLimit": {"percentage": 0, "record": 0},
+ "speed": {"channel": 1, "record": 1000},
+ },
+ }
+}
with ProcessDefinition(
name="task_datax_example",
@@ -37,6 +78,8 @@ with ProcessDefinition(
) as pd:
# This task synchronizes the data in `t_ds_project`
# of `first_mysql` database to `target_project` of `second_mysql` database.
+ # You have to make sure data source named `first_mysql` and `second_mysql` exists
+ # in your environment.
task1 = DataX(
name="task_datax",
datasource_name="first_mysql",
@@ -45,6 +88,7 @@ with ProcessDefinition(
target_table="target_table",
)
- # you can custom json_template of datax to sync data.
- task2 = CustomDataX(name="task_custom_datax", json=JSON_TEMPLATE)
+ # You can custom json_template of datax to sync data. This task create a new
+ # datax job same as task1, transfer record from `first_mysql` to `second_mysql`
+ task2 = CustomDataX(name="task_custom_datax", json=str(JSON_TEMPLATE))
pd.run()