You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2022/01/05 11:57:38 UTC

[dolphinscheduler] branch dev updated: [python] Enhance task datax example (#7801)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new fcbb5f4  [python] Enhance task datax example (#7801)
fcbb5f4 is described below

commit fcbb5f4d8f337236de1aab17a2d1f00b51c2d535
Author: Jiajie Zhong <zh...@hotmail.com>
AuthorDate: Wed Jan 5 19:57:28 2022 +0800

    [python] Enhance task datax example (#7801)
    
    * [python] Enhance task datax example
    
    * Add full example for `CustomDataX.json`
    * Add comment about datasource need to exists.
    
    close: #7800
    
    * Add missing parameter setting
---
 .../examples/task_datax_example.py                 | 50 ++++++++++++++++++++--
 1 file changed, 47 insertions(+), 3 deletions(-)

diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py
index 1832921..cdc7f0f 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py
@@ -29,7 +29,48 @@ from pydolphinscheduler.core.process_definition import ProcessDefinition
 from pydolphinscheduler.tasks.datax import CustomDataX, DataX
 
 # datax json template
-JSON_TEMPLATE = ""
+JSON_TEMPLATE = {
+    "job": {
+        "content": [
+            {
+                "reader": {
+                    "name": "mysqlreader",
+                    "parameter": {
+                        "username": "usr",
+                        "password": "pwd",
+                        "column": ["id", "name", "code", "description"],
+                        "splitPk": "id",
+                        "connection": [
+                            {
+                                "table": ["source_table"],
+                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/source_db"],
+                            }
+                        ],
+                    },
+                },
+                "writer": {
+                    "name": "mysqlwriter",
+                    "parameter": {
+                        "writeMode": "insert",
+                        "username": "usr",
+                        "password": "pwd",
+                        "column": ["id", "name"],
+                        "connection": [
+                            {
+                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db",
+                                "table": ["target_table"],
+                            }
+                        ],
+                    },
+                },
+            }
+        ],
+        "setting": {
+            "errorLimit": {"percentage": 0, "record": 0},
+            "speed": {"channel": 1, "record": 1000},
+        },
+    }
+}
 
 with ProcessDefinition(
     name="task_datax_example",
@@ -37,6 +78,8 @@ with ProcessDefinition(
 ) as pd:
     # This task synchronizes the data in `t_ds_project`
     # of `first_mysql` database to `target_project` of `second_mysql` database.
+    # You have to make sure data source named `first_mysql` and `second_mysql` exists
+    # in your environment.
     task1 = DataX(
         name="task_datax",
         datasource_name="first_mysql",
@@ -45,6 +88,7 @@ with ProcessDefinition(
         target_table="target_table",
     )
 
-    # you can custom json_template of datax to sync data.
-    task2 = CustomDataX(name="task_custom_datax", json=JSON_TEMPLATE)
+    # You can custom json_template of datax to sync data. This task create a new
+    # datax job same as task1, transfer record from `first_mysql` to `second_mysql`
+    task2 = CustomDataX(name="task_custom_datax", json=str(JSON_TEMPLATE))
     pd.run()