You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/05/23 09:14:09 UTC

[GitHub] [incubator-inlong-website] thexiay commented on a diff in pull request #381: [INLONG-380][Sort] Add lightweight sort and transform related instructions

thexiay commented on code in PR #381:
URL: https://github.com/apache/incubator-inlong-website/pull/381#discussion_r879206398


##########
docs/modules/sort/quick_start.md:
##########
@@ -3,64 +3,43 @@ title: Deployment
 sidebar_position: 2
 ---
 
-## Set up flink environment
-Currently inlong-sort is based on flink, before you run an inlong-sort application,
-you need to set up [flink environment](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/overview/).
+## Set up Flink Environment
+Currently InLong-Sort is based on Flink, before you run an InLong-Sort Application,
+you need to set up [Flink Environment](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/overview/).
 
-Currently, inlong-sort relys on flink-1.13.5. Chose `flink-1.13.5-bin-scala_2.11.tgz` when downloading package.
+Currently, InLong-Sort relys on Flink-1.13.5. Chose `flink-1.13.5-bin-scala_2.11.tgz` when downloading package.
 
-Once your flink environment is set up, you can visit web ui of flink, whose address is stored in `/${your_flink_path}/conf/masters`.
+Once your Flink Environment is set up, you can visit Web UI of Flink, whose address is stored in `/${your_flink_path}/conf/masters`.
 
 ## Prepare installation files
 All installation files at `inlong-sort` directory.
 
 ## Starting an inlong-sort application
-Now you can submit job to flink with the jar compiled, refer to [how to submit job to flink](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/cli/#submitting-a-job).
+Now you can submit job to Flink with the jar compiled, refer to [How to submit job to Flink](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/cli/#submitting-a-job).
 
 Example:
 ```
-./bin/flink run -c org.apache.inlong.sort.singletenant.flink.Entrance inlong-sort/sort-[version].jar \
---cluster-id debezium2hive --dataflow.info.file /YOUR_DATAFLOW_INFO_DIR/debezium-to-hive.json \
---source.type pulsar --sink.type hive --sink.hive.rolling-policy.rollover-interval 60000 \
---metrics.audit.proxy.hosts 127.0.0.1:10081 --sink.hive.rolling-policy.check-interval 30000
+./bin/flink run -c org.apache.inlong.sort.Entrance inlong-sort/sort-[version].jar \
+--group.info.file /YOUR_DATAFLOW_INFO_DIR/mysql-to-kafka.json
 ```
 
 Notice:
 
-- `-c org.apache.inlong.sort.singletenant.flink.Entrance` is the main class name
+- `-c org.apache.inlong.sort.Entrance` is the main class name
 
 - `inlong-sort/sort-[version].jar` is the compiled jar
 
 ## Necessary configurations
-- `--cluster-id ` represent a specified inlong-sort application, same as the configuration of `sort.appName` in inlong-manager
-- `--dataflow.info.file` dataflow configuration file path
-- `--source.type` source of the application, currently "pulsar" is supported
-- `--sink.type` sink of the application, currently "clickhouse", "hive", "iceberg", "kafka" are supported
-- `--metrics.audit.proxy.hosts` audit proxy host address for reporting audit metrics
-
+- `--group.info.file` dataflow configuration file path

Review Comment:
   notice if we need out Extended extrace and load,we should put our connector into ${FLINK_HOME}/lib folder.For example, if we want mysql to hive,we shoule put `sort-connector-mysql-cdc.jar` and `sort-connector-hive.jar` in ${FLINK_HOME}/lib



##########
docs/modules/sort/quick_start.md:
##########
@@ -3,64 +3,43 @@ title: Deployment
 sidebar_position: 2
 ---
 
-## Set up flink environment
-Currently inlong-sort is based on flink, before you run an inlong-sort application,
-you need to set up [flink environment](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/overview/).
+## Set up Flink Environment
+Currently InLong-Sort is based on Flink, before you run an InLong-Sort Application,
+you need to set up [Flink Environment](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/overview/).
 
-Currently, inlong-sort relys on flink-1.13.5. Chose `flink-1.13.5-bin-scala_2.11.tgz` when downloading package.
+Currently, InLong-Sort relys on Flink-1.13.5. Chose `flink-1.13.5-bin-scala_2.11.tgz` when downloading package.
 
-Once your flink environment is set up, you can visit web ui of flink, whose address is stored in `/${your_flink_path}/conf/masters`.
+Once your Flink Environment is set up, you can visit Web UI of Flink, whose address is stored in `/${your_flink_path}/conf/masters`.
 
 ## Prepare installation files
 All installation files at `inlong-sort` directory.
 
 ## Starting an inlong-sort application
-Now you can submit job to flink with the jar compiled, refer to [how to submit job to flink](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/cli/#submitting-a-job).
+Now you can submit job to Flink with the jar compiled, refer to [How to submit job to Flink](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/cli/#submitting-a-job).
 
 Example:
 ```
-./bin/flink run -c org.apache.inlong.sort.singletenant.flink.Entrance inlong-sort/sort-[version].jar \
---cluster-id debezium2hive --dataflow.info.file /YOUR_DATAFLOW_INFO_DIR/debezium-to-hive.json \
---source.type pulsar --sink.type hive --sink.hive.rolling-policy.rollover-interval 60000 \
---metrics.audit.proxy.hosts 127.0.0.1:10081 --sink.hive.rolling-policy.check-interval 30000
+./bin/flink run -c org.apache.inlong.sort.Entrance inlong-sort/sort-[version].jar \
+--group.info.file /YOUR_DATAFLOW_INFO_DIR/mysql-to-kafka.json
 ```
 
 Notice:
 
-- `-c org.apache.inlong.sort.singletenant.flink.Entrance` is the main class name
+- `-c org.apache.inlong.sort.Entrance` is the main class name
 
 - `inlong-sort/sort-[version].jar` is the compiled jar
 
 ## Necessary configurations
-- `--cluster-id ` represent a specified inlong-sort application, same as the configuration of `sort.appName` in inlong-manager
-- `--dataflow.info.file` dataflow configuration file path
-- `--source.type` source of the application, currently "pulsar" is supported
-- `--sink.type` sink of the application, currently "clickhouse", "hive", "iceberg", "kafka" are supported
-- `--metrics.audit.proxy.hosts` audit proxy host address for reporting audit metrics
-
+- `--group.info.file` dataflow configuration file path
+- 

Review Comment:
   Redundant `-`



##########
docs/modules/sort/dataflow_example.md:
##########
@@ -5,307 +5,1717 @@ sidebar_position: 3
 
 # Examples
 
-To make it easier for you to create inlong-sort jobs, here we list some dataflow configuration examples.
+To make it easier for you to create InLong-Sort jobs, here we list some dataflow configuration examples.
 
-## Pulsar to Kafka
+## MySQL to Kafka
 
-Normal example:
+- Single table or sub-database sub-table sync example:
 
 ```json
 {
-    "id": 1,
-    "source_info": {
-        "type": "pulsar",
-        "admin_url": "YOUR_PULSAR_ADMIN_URL",
-        "service_url": "YOUR_PULSAR_SERVICE_URL",
-        "topic": "YOUR_PULSAR_TOPIC",
-        "subscription_name": "debezium2canal",
-        "deserialization_info": {
-            "type": "debezium_json",
-            "ignore_parse_errors": true,
-            "timestamp_format_standard": "ISO_8601"
-        },
-        "fields": [
-            {
-                "type": "base",
-                "name": "name",
-                "format_info": {
-                    "type": "string"
+    "groupId":"1",
+    "streams":[
+        {
+            "streamId":"1",
+            "nodes":[
+                {
+                    "type":"mysqlExtract",
+                    "id":"1",
+                    "name":"mysql_input",
+                    "fields":[
+                        {
+                            "type":"base",
+                            "name":"id",
+                            "formatInfo":{
+                                "type":"long"
+                            }
+                        },
+                        {
+                            "type":"base",
+                            "name":"name",
+                            "formatInfo":{
+                                "type":"string"
+                            }
+                        },
+                        {
+                            "type":"base",
+                            "name":"age",
+                            "formatInfo":{
+                                "type":"int"
+                            }
+                        }
+                    ],
+                    "properties":{
+                        "append-mode":"true"
+                    },
+                    "primaryKey":"id",
+                    "tableNames":[
+                        "YOUR_TABLE"
+                    ],
+                    "hostname":"YOUR_MYSQL_HOST",
+                    "username":"YOUR_USERNAME",
+                    "password":"YOUR_PASSWORD",
+                    "database":"YOUR_DATABASE",
+                    "port":3306,
+                    "incrementalSnapshotEnabled":true
+                },
+                {
+                    "type":"kafkaLoad",
+                    "id":"2",
+                    "name":"kafka_output",
+                    "fields":[
+                        {
+                            "type":"base",
+                            "name":"id",
+                            "formatInfo":{
+                                "type":"long"
+                            }
+                        },
+                        {
+                            "type":"base",
+                            "name":"name",
+                            "formatInfo":{
+                                "type":"string"
+                            }
+                        },
+                        {
+                            "type":"base",
+                            "name":"age",
+                            "formatInfo":{
+                                "type":"int"
+                            }
+                        }
+                    ],
+                    "fieldRelationShips":[
+                        {
+                            "type":"fieldRelationShip",
+                            "inputField":{
+                                "type":"base",
+                                "name":"id",
+                                "formatInfo":{
+                                    "type":"long"
+                                }
+                            },
+                            "outputField":{
+                                "type":"base",
+                                "name":"id",
+                                "formatInfo":{
+                                    "type":"long"
+                                }
+                            }
+                        },
+                        {
+                            "type":"fieldRelationShip",
+                            "inputField":{
+                                "type":"base",
+                                "name":"name",
+                                "formatInfo":{
+                                    "type":"string"
+                                }
+                            },
+                            "outputField":{
+                                "type":"base",
+                                "name":"name",
+                                "formatInfo":{
+                                    "type":"string"
+                                }
+                            }
+                        },
+                        {
+                            "type":"fieldRelationShip",
+                            "inputField":{
+                                "type":"base",
+                                "name":"age",
+                                "formatInfo":{
+                                    "type":"int"
+                                }
+                            },
+                            "outputField":{
+                                "type":"base",
+                                "name":"age",
+                                "formatInfo":{
+                                    "type":"int"
+                                }
+                            }
+                        }
+                    ],
+                    "topic":"YOUR_TOPIC",
+                    "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
+                    "format":{
+                        "type":"jsonFormat",
+                        "failOnMissingField":false,
+                        "ignoreParseErrors":true,
+                        "timestampFormatStandard":"SQL",
+                        "mapNullKeyMode":"DROP",
+                        "mapNullKeyLiteral":"null",
+                        "encodeDecimalAsPlainNumber":true
+                    },
+                    "primaryKey":"id"
                 }
-            },
-            {
-                "type": "base",
-                "name": "age",
-                "format_info": {
-                    "type": "int"
+            ],
+            "relations":[
+                {
+                    "type":"baseRelation",
+                    "inputs":[
+                        "1"
+                    ],
+                    "outputs":[
+                        "2"
+                    ]
                 }
+            ]
+        }
+    ]
+}
+```
+
+- Whole-database migration example:
+
+```json
+{
+  "groupId":"1",
+  "streams":[
+    {
+      "streamId":"1",
+      "nodes":[
+        {
+          "type":"mysqlExtract",
+          "id":"1",
+          "name":"mysql_input",
+          "fields":[
+            {
+              "type":"builtin",
+              "name":"data",
+              "formatInfo":{
+                "type":"string"
+              },
+              "builtinField":"MYSQL_METADATA_DATA"
             }
-        ],
-        "authentication": null
-    },
-    "sink_info": {
-        "type": "kafka",
-        "fields": [
+          ],
+          "properties":{
+            "append-mode":"true",
+            "migrate-all":"true"
+          },
+          "tableNames":[
+            "[\\s\\S]*.*"
+          ],
+          "hostname":"YOUR_MYSQL_HOST",
+          "username":"YOUR_USERNAME",
+          "password":"YOUR_PASSWORD",
+          "database":"[\\s\\S]*.*",
+          "port":3306,
+          "incrementalSnapshotEnabled":false
+        },
+        {
+          "type":"kafkaLoad",
+          "id":"2",
+          "name":"kafka_output",
+          "fields":[
             {
-                "type": "base",
-                "name": "name",
-                "format_info": {
-                    "type": "string"
-                }
-            },
+              "type":"base",
+              "name":"data",
+              "formatInfo":{
+                "type":"string"
+              }
+            }
+          ],
+          "fieldRelationShips":[
             {
-                "type": "base",
-                "name": "age",
-                "format_info": {
-                    "type": "int"
+              "type":"fieldRelationShip",
+              "inputField":{
+                "type":"base",
+                "name":"data",
+                "formatInfo":{
+                  "type":"string"
+                }
+              },
+              "outputField":{
+                "type":"base",
+                "name":"data",
+                "formatInfo":{
+                  "type":"string"
                 }
+              }
             }
-        ],
-        "address": "YOUR_KAFKA_ADDRESS",
-        "topic": "sort_test_canal",
-        "serialization_info": {
-            "type": "canal"
+          ],
+          "topic":"YOUR_TOPIC",
+          "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
+          "format":{
+            "type":"csvFormat",
+            "fieldDelimiter":",",
+            "disableQuoteCharacter":true,
+            "quoteCharacter":null,
+            "allowComments":false,
+            "ignoreParseErrors":true,
+            "arrayElementDelimiter":";",
+            "escapeCharacter":null,
+            "nullLiteral":null
+          }
         }
-    },
-    "properties": {
-        "consumer.bootstrap-mode": "earliest",
-        "transaction.timeout.ms": 900000
+      ],
+      "relations":[
+        {
+          "type":"baseRelation",
+          "inputs":[
+            "1"
+          ],
+          "outputs":[
+            "2"
+          ]
+        }
+      ]
     }
+  ]
 }
 ```
 
-Whole-database migration example:
+## Kafka to Hive
 
 ```json
 {
-    "id": 123,
-    "source_info": {
-        "type": "pulsar",
-        "admin_url": "YOUR_PULSAR_ADMIN_URL",
-        "service_url": "YOUR_PULSAR_SERVICE_URL",
-        "topic": "YOUR_PULSAR_TOPIC",
-        "subscription_name": "whole-db-migration",
-        "deserialization_info": {
-            "type": "debezium_json",
-            "ignore_parse_errors": false,
-            "timestamp_format_standard": "ISO_8601",
-            "include_update_before": true
-        },
-        "fields": [
-            {
-                "type": "builtin",
-                "name": "db",
-                "format_info": {
-                    "type": "string"
-                },
-                "builtin_field": "MYSQL_METADATA_DATABASE"
-            },
-            {
-                "type": "builtin",
-                "name": "table",
-                "format_info": {
-                    "type": "string"
-                },
-                "builtin_field": "MYSQL_METADATA_TABLE"
-            },
-            {
-                "type": "builtin",
-                "name": "mydata",
-                "format_info": {
-                    "type": "string"
-                },
-                "builtin_field": "MYSQL_METADATA_DATA"
-            },
-            {
-                "type": "builtin",
-                "name": "es",
-                "format_info": {
-                    "type": "long"
-                },
-                "builtin_field": "MYSQL_METADATA_EVENT_TIME"
-            },
-            {
-                "type": "builtin",
-                "name": "isDdl",
-                "format_info": {
-                    "type": "boolean"
+    "groupId":"1",
+    "streams":[
+        {
+            "streamId":"1",
+            "nodes":[
+                {
+                    "type":"kafkaExtract",
+                    "id":"1",
+                    "name":"kafka_input",
+                    "fields":[
+                        {
+                            "type":"base",
+                            "name":"id",
+                            "formatInfo":{
+                                "type":"long"
+                            }
+                        },
+                        {
+                            "type":"base",
+                            "name":"name",
+                            "formatInfo":{
+                                "type":"string"
+                            }
+                        },
+                        {
+                            "type":"base",
+                            "name":"age",
+                            "formatInfo":{
+                                "type":"int"
+                            }
+                        }
+                    ],
+                    "topic":"YOUR_TOPIC",
+                    "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
+                    "format":{
+                        "type":"jsonFormat",
+                        "failOnMissingField":false,
+                        "ignoreParseErrors":true,
+                        "timestampFormatStandard":"SQL",
+                        "mapNullKeyMode":"DROP",
+                        "mapNullKeyLiteral":"null",
+                        "encodeDecimalAsPlainNumber":true
+                    },
+                    "scanStartupMode":"EARLIEST_OFFSET"
                 },
-                "builtin_field": "MYSQL_METADATA_IS_DDL"
-            },
-            {
-                "type": "builtin",
-                "name": "type",
-                "format_info": {
-                    "type": "string"
+                {
+                    "type":"hiveLoad",
+                    "id":"2",
+                    "name":"hive_output",
+                    "fields":[
+                        {
+                            "type":"base",
+                            "name":"id",
+                            "formatInfo":{
+                                "type":"long"
+                            }
+                        },
+                        {
+                            "type":"base",
+                            "name":"name",
+                            "formatInfo":{
+                                "type":"string"
+                            }
+                        },
+                        {
+                            "type":"base",
+                            "name":"age",
+                            "formatInfo":{
+                                "type":"int"
+                            }
+                        }
+                    ],
+                    "fieldRelationShips":[
+                        {
+                            "type":"fieldRelationShip",
+                            "inputField":{
+                                "type":"base",
+                                "name":"id",
+                                "formatInfo":{
+                                    "type":"long"
+                                }
+                            },
+                            "outputField":{
+                                "type":"base",
+                                "name":"id",
+                                "formatInfo":{
+                                    "type":"long"
+                                }
+                            }
+                        },
+                        {
+                            "type":"fieldRelationShip",
+                            "inputField":{
+                                "type":"base",
+                                "name":"name",
+                                "formatInfo":{
+                                    "type":"string"
+                                }
+                            },
+                            "outputField":{
+                                "type":"base",
+                                "name":"name",
+                                "formatInfo":{
+                                    "type":"string"
+                                }
+                            }
+                        },
+                        {
+                            "type":"fieldRelationShip",
+                            "inputField":{
+                                "type":"base",
+                                "name":"age",
+                                "formatInfo":{
+                                    "type":"int"
+                                }
+                            },
+                            "outputField":{
+                                "type":"base",
+                                "name":"age",
+                                "formatInfo":{
+                                    "type":"int"
+                                }
+                            }
+                        }
+                    ],
+                    "sinkParallelism":1,
+                    "catalogName":"hivecatlog",
+                    "database":"YOUR_DATABASE",
+                    "tableName":"YOUR_TABLE_NAME",
+                    "hiveConfDir":"YOUR_HIVE_CONF_DIR",
+                    "hiveVersion":"3.1.2",
+                    "hadoopConfDir":"YOUR_HADOOP_CONF_DIR"
+                }
+            ],
+            "relations":[
+                {
+                    "type":"baseRelation",
+                    "inputs":[
+                        "1"
+                    ],
+                    "outputs":[
+                        "2"
+                    ]
+                }
+            ]
+        }
+    ]
+}
+```
+
+## Transform examples
+
+Currently only supports string split, string regex replace, string regex replace first matched value, data distinct,data filter, regular join, and etc.

Review Comment:
   add a link to support transformation:
   https://github.com/yunqingmoswu/incubator-inlong-website/blob/INLONG-380/docs/modules/sort/overview.md#supported-transform



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org