You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/16 07:21:29 UTC
[incubator-inlong-website] branch master updated: [INLONG-416][Sort] Alter Sort example doc (#417)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong-website.git
The following commit(s) were added to refs/heads/master by this push:
new f0ba3ef31 [INLONG-416][Sort] Alter Sort example doc (#417)
f0ba3ef31 is described below
commit f0ba3ef312aab25fe071873beb2102dfe3bceab0
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Thu Jun 16 15:21:24 2022 +0800
[INLONG-416][Sort] Alter Sort example doc (#417)
---
docs/modules/sort/datastream_example.md | 1823 ++-----------------
.../current/modules/sort/datastream_example.md | 1827 ++------------------
2 files changed, 271 insertions(+), 3379 deletions(-)
diff --git a/docs/modules/sort/datastream_example.md b/docs/modules/sort/datastream_example.md
index ccffd67df..1ee497ac5 100644
--- a/docs/modules/sort/datastream_example.md
+++ b/docs/modules/sort/datastream_example.md
@@ -3,1719 +3,164 @@ title: DataStream Example
sidebar_position: 3
---
-# Examples
+## Examples
To make it easier for you to create InLong-Sort jobs, here we list some data stream configuration examples.
+The following will introduce SQL, Dashboard, Manager Client Tools methods to use Inlong Sort.
-## MySQL to Kafka
+## Precondition
-- Single table or sub-database sub-table sync example:
+Please confirm whether there is the following environment:
+* JDK 1.8.x
+* Flink 1.13.5
+* MySQL
+* Kafka
+* Hadoop
+* Hive 3.x
-```json
-{
- "groupId":"1",
- "streams":[
- {
- "streamId":"1",
- "nodes":[
- {
- "type":"mysqlExtract",
- "id":"1",
- "name":"mysql_input",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "properties":{
- "append-mode":"true"
- },
- "primaryKey":"id",
- "tableNames":[
- "YOUR_TABLE"
- ],
- "hostname":"YOUR_MYSQL_HOST",
- "username":"YOUR_USERNAME",
- "password":"YOUR_PASSWORD",
- "database":"YOUR_DATABASE",
- "port":3306,
- "incrementalSnapshotEnabled":true
- },
- {
- "type":"kafkaLoad",
- "id":"2",
- "name":"kafka_output",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- },
- "primaryKey":"id"
- }
- ],
- "relations":[
- {
- "type":"baseRelation",
- "inputs":[
- "1"
- ],
- "outputs":[
- "2"
- ]
- }
- ]
- }
- ]
-}
-```
+If there is no available environment, please refer to the following steps:
-- Whole-database migration example:
+Step1. If no Flink cluster environment is available, you may build a Flink Standalone single cluster for use. [Flink Standalone Mode](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/)
-```json
-{
- "groupId":"1",
- "streams":[
- {
- "streamId":"1",
- "nodes":[
- {
- "type":"mysqlExtract",
- "id":"1",
- "name":"mysql_input",
- "fields":[
- {
- "type":"builtin",
- "name":"data",
- "formatInfo":{
- "type":"string"
- },
- "builtinField":"MYSQL_METADATA_DATA"
- }
- ],
- "properties":{
- "append-mode":"true",
- "migrate-all":"true"
- },
- "tableNames":[
- "[\\s\\S]*.*"
- ],
- "hostname":"YOUR_MYSQL_HOST",
- "username":"YOUR_USERNAME",
- "password":"YOUR_PASSWORD",
- "database":"[\\s\\S]*.*",
- "port":3306,
- "incrementalSnapshotEnabled":false
- },
- {
- "type":"kafkaLoad",
- "id":"2",
- "name":"kafka_output",
- "fields":[
- {
- "type":"base",
- "name":"data",
- "formatInfo":{
- "type":"string"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"data",
- "formatInfo":{
- "type":"string"
- }
- },
- "outputField":{
- "type":"base",
- "name":"data",
- "formatInfo":{
- "type":"string"
- }
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"csvFormat",
- "fieldDelimiter":",",
- "disableQuoteCharacter":true,
- "quoteCharacter":null,
- "allowComments":false,
- "ignoreParseErrors":true,
- "arrayElementDelimiter":";",
- "escapeCharacter":null,
- "nullLiteral":null
- }
- }
- ],
- "relations":[
- {
- "type":"baseRelation",
- "inputs":[
- "1"
- ],
- "outputs":[
- "2"
- ]
- }
- ]
- }
- ]
-}
-```
+Step2. If no MySQL environment is available or binlog is not enabled, you need to install MySQL and turn on binlog. [MySQL Installation Guide](https://dev.mysql.com/doc/mysql-installation-excerpt/5.7/en/) and [MySQL binlog](https://dev.mysql.com/doc/refman/5.7/en/replication-howto-masterbaseconfig.html)
+(You can also directly refer to Inlong MySQL Extract Node doc about MySQL turn on binlog information.)
-## Kafka to Hive
+Step3. If no Kafka and Hadoop environment is available, you need to install Kafka and Hadoop. [Kafka Installation Guide](https://kafka.apache.org/quickstart) and [Hadoop Installation Guide](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html)
-```json
-{
- "groupId":"1",
- "streams":[
- {
- "streamId":"1",
- "nodes":[
- {
- "type":"kafkaExtract",
- "id":"1",
- "name":"kafka_input",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- },
- "scanStartupMode":"EARLIEST_OFFSET"
- },
- {
- "type":"hiveLoad",
- "id":"2",
- "name":"hive_output",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ],
- "sinkParallelism":1,
- "catalogName":"hivecatlog",
- "database":"YOUR_DATABASE",
- "tableName":"YOUR_TABLE_NAME",
- "hiveConfDir":"YOUR_HIVE_CONF_DIR",
- "hiveVersion":"3.1.2",
- "hadoopConfDir":"YOUR_HADOOP_CONF_DIR"
- }
- ],
- "relations":[
- {
- "type":"baseRelation",
- "inputs":[
- "1"
- ],
- "outputs":[
- "2"
- ]
- }
- ]
- }
- ]
-}
-```
+Step4. If no Hive environment is available, you need to install Hive and turn on metastore service. [Hive Installation Guide](https://cwiki.apache.org/confluence/display/Hive//GettingStarted)
-## Transform examples
+Step5. Download Inlong installation package [inlong-distribution-(version)-incubating-bin.tar.gz](https://inlong.apache.org/download/main)
+and choose MySQL,Kafka,Hive connector dependencies in the [inlong-distribution-(version)-incubating-sort-connectors.tar.gz](https://inlong.apache.org/download/main)
-Currently only supports string split, string regex replace, string regex replace first matched value, data distinct,data filter, regular join, and etc.
-The following takes kafka to kafka as an example to illustrate the usage of each transform.
+Step6. Put sort-dist-[version].jar and MySQL,Kafka,Hive connector jars into FLINK_HOME/lib .
-- String Split:
+Note: sort-dist-[version].jar in the inlong-sort package. [inlong-distribution-(version)-incubating-bin.tar.gz](https://inlong.apache.org/download/main)
-```json
-{
- "groupId":"1",
- "streams":[
- {
- "streamId":"1",
- "nodes":[
- {
- "type":"kafkaExtract",
- "id":"1",
- "name":"kafka_input",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- },
- "scanStartupMode":"EARLIEST_OFFSET"
- },
- {
- "type":"kafkaLoad",
- "id":"2",
- "name":"kafka_output",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"splitIndex",
- "field":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- "separator":{
- "type":"stringConstant",
- "value":"YOUR_SPLIT_STR"
- },
- "index":{
- "type":"constant",
- "value":0
- }
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- }
- }
- ],
- "relations":[
- {
- "type":"baseRelation",
- "inputs":[
- "1"
- ],
- "outputs":[
- "2"
- ]
- }
- ]
- }
- ]
-}
-```
+## Usage for SQL API
+
+This example defines the data flow for a single table. (mysql-->kafka-->hive)
-- String Regular Replace:
+### MySQL to Kafka
-```json
-{
- "groupId":"1",
- "streams":[
- {
- "streamId":"1",
- "nodes":[
- {
- "type":"kafkaExtract",
- "id":"1",
- "name":"kafka_input",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- },
- "scanStartupMode":"EARLIEST_OFFSET"
- },
- {
- "type":"kafkaLoad",
- "id":"2",
- "name":"kafka_output",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"regexpReplace",
- "field":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- "regex":{
- "type":"stringConstant",
- "value":"YOUR_REPLACE_REGEX"
- },
- "replacement":{
- "type":"stringConstant",
- "value":"YOUR_REPLACEMENT"
- }
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- }
- }
- ],
- "relations":[
- {
- "type":"baseRelation",
- "inputs":[
- "1"
- ],
- "outputs":[
- "2"
- ]
- }
- ]
- }
- ]
-}
+Single table sync example:
+
+```shell
+./bin/flink run -c org.apache.inlong.sort.Entrance FLINK_HOME/lib/sort-dist-[version].jar \
+--sql.script.file /YOUR_SQL_SCRIPT_DIR/mysql-to-kafka.sql
```
-- String Regular Replace First Matched Value:
+* mysql-to-kafka.sql
+
+```sql
+CREATE TABLE `table_1`(
+ PRIMARY KEY (`id`) NOT ENFORCED,
+ `id` BIGINT,
+ `name` STRING,
+ `age` INT,
+ `salary` FLOAT,
+ `ts` TIMESTAMP(2),
+ `event_type` STRING)
+ WITH (
+ 'append-mode' = 'true',
+ 'connector' = 'mysql-cdc-inlong',
+ 'hostname' = 'localhost',
+ 'username' = 'root',
+ 'password' = 'password',
+ 'database-name' = 'dbName',
+ 'table-name' = 'tableName'
+);
+
+CREATE TABLE `table_2`(
+ `id` BIGINT,
+ `name` STRING,
+ `age` INT,
+ `salary` FLOAT,
+ `ts` TIMESTAMP(2))
+ WITH (
+ 'topic' = 'topicName',-- Your kafka topic
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'connector' = 'kafka',
+ 'json.timestamp-format.standard' = 'SQL',
+ 'json.encode.decimal-as-plain-number' = 'true',
+ 'json.map-null-key.literal' = 'null',
+ 'json.ignore-parse-errors' = 'true',
+ 'json.map-null-key.mode' = 'DROP',
+ 'format' = 'json',
+ 'json.fail-on-missing-field' = 'false'
+);
+
+INSERT INTO `table_2`
+ SELECT
+ `id` AS `id`,
+ `name` AS `name`,
+ `age` AS `age`,
+ CAST(NULL as FLOAT) AS `salary`,
+ `ts` AS `ts`
+ FROM `table_1`;
-```json
-{
- "groupId":"1",
- "streams":[
- {
- "streamId":"1",
- "nodes":[
- {
- "type":"kafkaExtract",
- "id":"1",
- "name":"kafka_input",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- },
- "scanStartupMode":"EARLIEST_OFFSET"
- },
- {
- "type":"kafkaLoad",
- "id":"2",
- "name":"kafka_output",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"regexpReplaceFirst",
- "field":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- "regex":{
- "type":"stringConstant",
- "value":"YOUR_REPLACE_REGEX"
- },
- "replacement":{
- "type":"stringConstant",
- "value":"YOUR_REPLACEMENT"
- }
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- }
- }
- ],
- "relations":[
- {
- "type":"baseRelation",
- "inputs":[
- "1"
- ],
- "outputs":[
- "2"
- ]
- }
- ]
- }
- ]
-}
```
-- Data Filter:
+### Kafka to Hive
+
+**Note:** First you need to create user table in Hive.
-```json
-{
- "groupId":"1",
- "streams":[
- {
- "streamId":"1",
- "nodes":[
- {
- "type":"kafkaExtract",
- "id":"1",
- "name":"kafka_input",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- },
- "scanStartupMode":"EARLIEST_OFFSET"
- },
- {
- "type":"kafkaLoad",
- "id":"2",
- "name":"kafka_output",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ],
- "filters":[
- {
- "type":"singleValueFilter",
- "logicOperator":{
- "type":"empty"
- },
- "source":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "compareOperator":{
- "type":"moreThanOrEqual"
- },
- "target":{
- "type":"constant",
- "value":18
- }
- },
- {
- "type":"singleValueFilter",
- "logicOperator":{
- "type":"and"
- },
- "source":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "compareOperator":{
- "type":"moreThanOrEqual"
- },
- "target":{
- "type":"constant",
- "value":25
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- }
- }
- ],
- "relations":[
- {
- "type":"baseRelation",
- "inputs":[
- "1"
- ],
- "outputs":[
- "2"
- ]
- }
- ]
- }
- ]
-}
+```shell
+./bin/flink run -c org.apache.inlong.sort.Entrance FLINK_HOME/lib/sort-dist-[version].jar \
+--sql.script.file /YOUR_SQL_SCRIPT_DIR/kafka-to-hive.sql
```
-- Data Distinct:
+* kafka-to-hive.sql
+
+```sql
+CREATE TABLE `table_1`(
+ `id` BIGINT,
+ `name` STRING,
+ `age` INT,
+ `salary` FLOAT,
+ `ts` TIMESTAMP(2)
+ WITH (
+ 'topic' = 'topicName',-- Your kafka topic
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'connector' = 'kafka',
+ 'scan.startup.mode' = 'earliest-offset',
+ 'json.timestamp-format.standard' = 'SQL',
+ 'json.encode.decimal-as-plain-number' = 'true',
+ 'json.map-null-key.literal' = 'null',
+ 'json.ignore-parse-errors' = 'true',
+ 'json.map-null-key.mode' = 'DROP',
+ 'format' = 'json',
+ 'json.fail-on-missing-field' = 'false',
+ 'properties.group.id' = 'groupId'-- Your group id
+);
+
+CREATE TABLE `user`(
+ `id` BIGINT,
+ `name` STRING,
+ `age` INT,
+ `salary` FLOAT,
+ `ts` TIMESTAMP(9))
+ WITH (
+ 'connector' = 'hive',
+ 'default-database' = 'default',
+ 'hive-version' = '3.1.2',
+ 'hive-conf-dir' = 'hdfs://ip:9000/.../hive-site.xml' -- Put your hive-site.xml into HDFS
+);
+
+INSERT INTO `user`
+ SELECT
+ `id` AS `id`,
+ `name` AS `name`,
+ `age` AS `age`,
+ CAST(NULL as FLOAT) AS `salary`,
+ `ts` AS `ts`
+ FROM `table_1`;
-```json
-{
- "groupId":"1",
- "streams":[
- {
- "streamId":"1",
- "nodes":[
- {
- "type":"kafkaExtract",
- "id":"1",
- "name":"kafka_input",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- {
- "type":"builtin",
- "name":"proctime",
- "formatInfo":{
- "type":"time",
- "format":"HH:mm:ss"
- },
- "builtinField":"PROCESS_TIME"
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- },
- "scanStartupMode":"EARLIEST_OFFSET"
- },
- {
- "type":"distinct",
- "id":"2",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ],
- "distinctFields":[
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- ],
- "orderField":{
- "type":"base",
- "name":"proctime",
- "formatInfo":{
- "type":"timestamp",
- "precision":2,
- "format":"yyyy-MM-dd HH:mm:ss"
- }
- },
- "orderDirection":"ASC"
- },
- {
- "type":"kafkaLoad",
- "id":"3",
- "name":"kafka_output",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- }
- }
- ],
- "relations":[
- {
- "type":"baseRelation",
- "inputs":[
- "1"
- ],
- "outputs":[
- "2"
- ]
- },
- {
- "type":"baseRelation",
- "inputs":[
- "2"
- ],
- "outputs":[
- "3"
- ]
- }
- ]
- }
- ]
-}
```
+Note: Of course you can also put all the SQL in one file.
-- Regular Join:
+## Usage for Dashboard
-```json
-{
- "groupId":"1",
- "streams":[
- {
- "streamId":"1",
- "nodes":[
- {
- "type":"kafkaExtract",
- "id":"1",
- "name":"kafka_input_1",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- },
- "scanStartupMode":"EARLIEST_OFFSET"
- },
- {
- "type":"kafkaExtract",
- "id":"2",
- "name":"kafka_input_2",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- },
- "scanStartupMode":"EARLIEST_OFFSET"
- },
- {
- "type":"baseTransform",
- "id":"3",
- "name":"transform_node",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- },
- "nodeId":"1"
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- },
- "nodeId":"1"
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- },
- "nodeId":"2"
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ]
- },
- {
- "type":"kafkaLoad",
- "id":"4",
- "name":"kafka_output",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- },
- "nodeId":"1"
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- },
- "nodeId":"1"
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- },
- "nodeId":"2"
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- }
- }
- ],
- "relations":[
- {
- "type":"innerJoin",
- "inputs":[
- "1",
- "2"
- ],
- "outputs":[
- "3"
- ],
- "joinConditionMap":{
- "2":[
- {
- "type":"singleValueFilter",
- "logicOperator":{
- "type":"empty"
- },
- "source":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- },
- "nodeId":"1"
- },
- "compareOperator":{
- "type":"equal"
- },
- "target":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- },
- "nodeId":"2"
- }
- }
- ]
- }
- },
- {
- "type":"baseRelation",
- "inputs":[
- "3"
- ],
- "outputs":[
- "4"
- ]
- }
- ]
- }
- ]
-}
-```
+The underlying capabilities are already available and will complement the Dashboard capabilities in the future.
+
+## Usage for Manager Client Tools
+TODO: It will be supported in the future.
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/sort/datastream_example.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/sort/datastream_example.md
index 15e2624b0..6bba02623 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/sort/datastream_example.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/sort/datastream_example.md
@@ -3,1717 +3,164 @@ title: 数据流示例
sidebar_position: 3
---
-# 流配置示例
+## 示例
-这里列出一些流配置示例以做参考。
+为了更容易创建InLong-Sort作业,这里我们列出了一些数据流配置示例。
+下面将介绍InLong-Sort的SQL、Dashboard、Manager客户端工具的使用。
-## 读 MySQL 写 Kafka
+## 前置配置
-单表或者分库分表同步配置示例如下:
+请确定是否有以下环境:
+* JDK 1.8.x
+* Flink 1.13.5
+* MySQL
+* Kafka
+* Hadoop
+* Hive 3.x
-```json
-{
- "groupId":"1",
- "streams":[
- {
- "streamId":"1",
- "nodes":[
- {
- "type":"mysqlExtract",
- "id":"1",
- "name":"mysql_input",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "properties":{
- "append-mode":"true"
- },
- "primaryKey":"id",
- "tableNames":[
- "YOUR_TABLE"
- ],
- "hostname":"YOUR_MYSQL_HOST",
- "username":"YOUR_USERNAME",
- "password":"YOUR_PASSWORD",
- "database":"YOUR_DATABASE",
- "port":3306,
- "incrementalSnapshotEnabled":true
- },
- {
- "type":"kafkaLoad",
- "id":"2",
- "name":"kafka_output",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- },
- "primaryKey":"id"
- }
- ],
- "relations":[
- {
- "type":"baseRelation",
- "inputs":[
- "1"
- ],
- "outputs":[
- "2"
- ]
- }
- ]
- }
- ]
-}
-```
+如果没有相关环境可参见以下安装步骤:
-整库迁移配置示例:
+Step1. 如果没有可用的 Flink 集群环境,为了运行示例只需要构建一个 Flink Standalone 单集群。[Flink Standalone Mode](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/)
-```json
-{
- "groupId":"1",
- "streams":[
- {
- "streamId":"1",
- "nodes":[
- {
- "type":"mysqlExtract",
- "id":"1",
- "name":"mysql_input",
- "fields":[
- {
- "type":"builtin",
- "name":"data",
- "formatInfo":{
- "type":"string"
- },
- "builtinField":"MYSQL_METADATA_DATA"
- }
- ],
- "properties":{
- "append-mode":"true",
- "migrate-all":"true"
- },
- "tableNames":[
- "[\\s\\S]*.*"
- ],
- "hostname":"YOUR_MYSQL_HOST",
- "username":"YOUR_USERNAME",
- "password":"YOUR_PASSWORD",
- "database":"[\\s\\S]*.*",
- "port":3306,
- "incrementalSnapshotEnabled":false
- },
- {
- "type":"kafkaLoad",
- "id":"2",
- "name":"kafka_output",
- "fields":[
- {
- "type":"base",
- "name":"data",
- "formatInfo":{
- "type":"string"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"data",
- "formatInfo":{
- "type":"string"
- }
- },
- "outputField":{
- "type":"base",
- "name":"data",
- "formatInfo":{
- "type":"string"
- }
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"csvFormat",
- "fieldDelimiter":",",
- "disableQuoteCharacter":true,
- "quoteCharacter":null,
- "allowComments":false,
- "ignoreParseErrors":true,
- "arrayElementDelimiter":";",
- "escapeCharacter":null,
- "nullLiteral":null
- }
- }
- ],
- "relations":[
- {
- "type":"baseRelation",
- "inputs":[
- "1"
- ],
- "outputs":[
- "2"
- ]
- }
- ]
- }
- ]
-}
-```
+Step2. 如果没有 MySQL 环境,需要安装 MySQL 及开启 binlog 能力。[MySQL Installation Guide](https://dev.mysql.com/doc/mysql-installation-excerpt/5.7/en/) 和 [MySQL binlog](https://dev.mysql.com/doc/refman/5.7/en/replication-howto-masterbaseconfig.html)
+(关于开启 MySQL binlog 能力也可以参考 Inlong MySQL 抽取节点配置的文档说明。)
-## 读 Kafka 写 Hive
+Step3. 本地如果没有 Kafka 或者 Hadoop 集群环境,需要安装单集群 Kafka 和 Hadoop 单集群。[Kafka Installation Guide](https://kafka.apache.org/quickstart) 和 [Hadoop Installation Guide](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html)
-```json
-{
- "groupId":"1",
- "streams":[
- {
- "streamId":"1",
- "nodes":[
- {
- "type":"kafkaExtract",
- "id":"1",
- "name":"kafka_input",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- },
- "scanStartupMode":"EARLIEST_OFFSET"
- },
- {
- "type":"hiveLoad",
- "id":"2",
- "name":"hive_output",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ],
- "sinkParallelism":1,
- "catalogName":"hivecatlog",
- "database":"YOUR_DATABASE",
- "tableName":"YOUR_TABLE_NAME",
- "hiveConfDir":"YOUR_HIVE_CONF_DIR",
- "hiveVersion":"3.1.2",
- "hadoopConfDir":"YOUR_HADOOP_CONF_DIR"
- }
- ],
- "relations":[
- {
- "type":"baseRelation",
- "inputs":[
- "1"
- ],
- "outputs":[
- "2"
- ]
- }
- ]
- }
- ]
-}
-```
+Step4. 下载安装 Hive 及开启 Hive metastore 服务。[Hive Installation Guide](https://cwiki.apache.org/confluence/display/Hive//GettingStarted)
-## Transform 示例
+Step5. 下载 Inlong 安装包 [inlong-distribution-(version)-incubating-bin.tar.gz] (https://inlong.apache.org/download/main)
+和 Inlong Sort Connectors 依赖包 [inlong-distribution-(version)-incubating-sort-connectors.tar.gz](https://inlong.apache.org/download/main) ,示例需要的MySQL、Kakfa、Hive 等 connector 可以在 Connectors 中获取。
+
+Step6. 把 sort-dist-[version].jar 和 MySQL,Kafka,Hive 的 connector jar 放到 FLINK_HOME/lib 中。
-目前我们仅支持字符串分割,字符串正则替换,字符串正则替换第一个匹配值、数据去重、数据过滤、常规 Join 等,
-下面以读 Kafka 写 Kafka 为例来说明各个 Transform 的具体用法。
-- 字符串分割:
+备注: 其中 sort-dist-[version].jar 在 inlong-sort 包中。 [inlong-distribution-(version)-incubating-bin.tar.gz](https://inlong.apache.org/download/main)
-```json
-{
- "groupId":"1",
- "streams":[
- {
- "streamId":"1",
- "nodes":[
- {
- "type":"kafkaExtract",
- "id":"1",
- "name":"kafka_input",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- },
- "scanStartupMode":"EARLIEST_OFFSET"
- },
- {
- "type":"kafkaLoad",
- "id":"2",
- "name":"kafka_output",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"splitIndex",
- "field":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- "separator":{
- "type":"stringConstant",
- "value":"YOUR_SPLIT_STR"
- },
- "index":{
- "type":"constant",
- "value":0
- }
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- }
- }
- ],
- "relations":[
- {
- "type":"baseRelation",
- "inputs":[
- "1"
- ],
- "outputs":[
- "2"
- ]
- }
- ]
- }
- ]
-}
-```
+## 使用 SQL API 方式
+
+示例构建了 MySQL --> Kafka --> Hive 的数据流,为了便于理解流程执行过程进行了拆解。
-- 字符串正则替换:
+### 读 MySQL 写 Kafka
-```json
-{
- "groupId":"1",
- "streams":[
- {
- "streamId":"1",
- "nodes":[
- {
- "type":"kafkaExtract",
- "id":"1",
- "name":"kafka_input",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- },
- "scanStartupMode":"EARLIEST_OFFSET"
- },
- {
- "type":"kafkaLoad",
- "id":"2",
- "name":"kafka_output",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"regexpReplace",
- "field":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- "regex":{
- "type":"stringConstant",
- "value":"YOUR_REPLACE_REGEX"
- },
- "replacement":{
- "type":"stringConstant",
- "value":"YOUR_REPLACEMENT"
- }
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- }
- }
- ],
- "relations":[
- {
- "type":"baseRelation",
- "inputs":[
- "1"
- ],
- "outputs":[
- "2"
- ]
- }
- ]
- }
- ]
-}
+单表同步配置示例如下:
+
+```shell
+./bin/flink run -c org.apache.inlong.sort.Entrance FLINK_HOME/lib/sort-dist-[version].jar \
+--sql.script.file /YOUR_SQL_SCRIPT_DIR/mysql-to-kafka.sql
```
-- 字符串正则替换第一个匹配值:
+* mysql-to-kafka.sql
+
+```sql
+CREATE TABLE `table_1`(
+ PRIMARY KEY (`id`) NOT ENFORCED,
+ `id` BIGINT,
+ `name` STRING,
+ `age` INT,
+ `salary` FLOAT,
+ `ts` TIMESTAMP(2),
+ `event_type` STRING)
+ WITH (
+ 'append-mode' = 'true',
+ 'connector' = 'mysql-cdc-inlong',
+ 'hostname' = 'localhost',
+ 'username' = 'root',
+ 'password' = 'password',
+ 'database-name' = 'dbName',
+ 'table-name' = 'tableName'
+);
+
+CREATE TABLE `table_2`(
+ `id` BIGINT,
+ `name` STRING,
+ `age` INT,
+ `salary` FLOAT,
+ `ts` TIMESTAMP(2))
+ WITH (
+ 'topic' = 'topicName',-- Your kafka topic
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'connector' = 'kafka',
+ 'json.timestamp-format.standard' = 'SQL',
+ 'json.encode.decimal-as-plain-number' = 'true',
+ 'json.map-null-key.literal' = 'null',
+ 'json.ignore-parse-errors' = 'true',
+ 'json.map-null-key.mode' = 'DROP',
+ 'format' = 'json',
+ 'json.fail-on-missing-field' = 'false'
+);
+
+INSERT INTO `table_2`
+ SELECT
+ `id` AS `id`,
+ `name` AS `name`,
+ `age` AS `age`,
+ CAST(NULL as FLOAT) AS `salary`,
+ `ts` AS `ts`
+ FROM `table_1`;
-```json
-{
- "groupId":"1",
- "streams":[
- {
- "streamId":"1",
- "nodes":[
- {
- "type":"kafkaExtract",
- "id":"1",
- "name":"kafka_input",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- },
- "scanStartupMode":"EARLIEST_OFFSET"
- },
- {
- "type":"kafkaLoad",
- "id":"2",
- "name":"kafka_output",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"regexpReplaceFirst",
- "field":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- "regex":{
- "type":"stringConstant",
- "value":"YOUR_REPLACE_REGEX"
- },
- "replacement":{
- "type":"stringConstant",
- "value":"YOUR_REPLACEMENT"
- }
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- }
- }
- ],
- "relations":[
- {
- "type":"baseRelation",
- "inputs":[
- "1"
- ],
- "outputs":[
- "2"
- ]
- }
- ]
- }
- ]
-}
```
-- 数据过滤:
+### 读 Kafka 写 Hive
+
+**注意:** 首先需要在 hive 中创建 user 表。
-```json
-{
- "groupId":"1",
- "streams":[
- {
- "streamId":"1",
- "nodes":[
- {
- "type":"kafkaExtract",
- "id":"1",
- "name":"kafka_input",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- },
- "scanStartupMode":"EARLIEST_OFFSET"
- },
- {
- "type":"kafkaLoad",
- "id":"2",
- "name":"kafka_output",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ],
- "filters":[
- {
- "type":"singleValueFilter",
- "logicOperator":{
- "type":"empty"
- },
- "source":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "compareOperator":{
- "type":"moreThanOrEqual"
- },
- "target":{
- "type":"constant",
- "value":18
- }
- },
- {
- "type":"singleValueFilter",
- "logicOperator":{
- "type":"and"
- },
- "source":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "compareOperator":{
- "type":"moreThanOrEqual"
- },
- "target":{
- "type":"constant",
- "value":25
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- }
- }
- ],
- "relations":[
- {
- "type":"baseRelation",
- "inputs":[
- "1"
- ],
- "outputs":[
- "2"
- ]
- }
- ]
- }
- ]
-}
+```shell
+./bin/flink run -c org.apache.inlong.sort.Entrance FLINK_HOME/lib/sort-dist-[version].jar \
+--sql.script.file /YOUR_SQL_SCRIPT_DIR/kafka-to-hive.sql
```
-- 数据去重:
+* kafka-to-hive.sql
+
+```sql
+CREATE TABLE `table_1`(
+ `id` BIGINT,
+ `name` STRING,
+ `age` INT,
+ `salary` FLOAT,
+ `ts` TIMESTAMP(2)
+ WITH (
+ 'topic' = 'topicName',-- Your kafka topic
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'connector' = 'kafka',
+ 'scan.startup.mode' = 'earliest-offset',
+ 'json.timestamp-format.standard' = 'SQL',
+ 'json.encode.decimal-as-plain-number' = 'true',
+ 'json.map-null-key.literal' = 'null',
+ 'json.ignore-parse-errors' = 'true',
+ 'json.map-null-key.mode' = 'DROP',
+ 'format' = 'json',
+ 'json.fail-on-missing-field' = 'false',
+ 'properties.group.id' = 'groupId'-- Your group id
+);
+
+CREATE TABLE `user`(
+ `id` BIGINT,
+ `name` STRING,
+ `age` INT,
+ `salary` FLOAT,
+ `ts` TIMESTAMP(9))
+ WITH (
+ 'connector' = 'hive',
+ 'default-database' = 'default',
+ 'hive-version' = '3.1.2',
+ 'hive-conf-dir' = 'hdfs://ip:9000/.../hive-site.xml' -- Put your hive-site.xml into HDFS
+);
+
+INSERT INTO `user`
+ SELECT
+ `id` AS `id`,
+ `name` AS `name`,
+ `age` AS `age`,
+ CAST(NULL as FLOAT) AS `salary`,
+ `ts` AS `ts`
+ FROM `table_1`;
-```json
-{
- "groupId":"1",
- "streams":[
- {
- "streamId":"1",
- "nodes":[
- {
- "type":"kafkaExtract",
- "id":"1",
- "name":"kafka_input",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- {
- "type":"builtin",
- "name":"proctime",
- "formatInfo":{
- "type":"time",
- "format":"HH:mm:ss"
- },
- "builtinField":"PROCESS_TIME"
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- },
- "scanStartupMode":"EARLIEST_OFFSET"
- },
- {
- "type":"distinct",
- "id":"2",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ],
- "distinctFields":[
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- ],
- "orderField":{
- "type":"base",
- "name":"proctime",
- "formatInfo":{
- "type":"timestamp",
- "precision":2,
- "format":"yyyy-MM-dd HH:mm:ss"
- }
- },
- "orderDirection":"ASC"
- },
- {
- "type":"kafkaLoad",
- "id":"3",
- "name":"kafka_output",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- }
- }
- ],
- "relations":[
- {
- "type":"baseRelation",
- "inputs":[
- "1"
- ],
- "outputs":[
- "2"
- ]
- },
- {
- "type":"baseRelation",
- "inputs":[
- "2"
- ],
- "outputs":[
- "3"
- ]
- }
- ]
- }
- ]
-}
```
+备注:以上过程所有的 SQL 可以放在一个文件中提交执行。
+
+## 使用 Inlong Dashboard 方式
+
+目前 Dashboard 支持文件采集同步的方式,以上数据源可视化配置方式正在开发中。
-- 常规 Join :
+## 使用 Manager Client Tools 方式
-```json
-{
- "groupId":"1",
- "streams":[
- {
- "streamId":"1",
- "nodes":[
- {
- "type":"kafkaExtract",
- "id":"1",
- "name":"kafka_input_1",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- },
- "scanStartupMode":"EARLIEST_OFFSET"
- },
- {
- "type":"kafkaExtract",
- "id":"2",
- "name":"kafka_input_2",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- },
- "scanStartupMode":"EARLIEST_OFFSET"
- },
- {
- "type":"baseTransform",
- "id":"3",
- "name":"transform_node",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- },
- "nodeId":"1"
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- },
- "nodeId":"1"
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- },
- "nodeId":"2"
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ]
- },
- {
- "type":"kafkaLoad",
- "id":"4",
- "name":"kafka_output",
- "fields":[
- {
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- },
- {
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- },
- {
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- ],
- "fieldRelationShips":[
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- },
- "nodeId":"1"
- },
- "outputField":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- },
- "nodeId":"1"
- },
- "outputField":{
- "type":"base",
- "name":"name",
- "formatInfo":{
- "type":"string"
- }
- }
- },
- {
- "type":"fieldRelationShip",
- "inputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- },
- "nodeId":"2"
- },
- "outputField":{
- "type":"base",
- "name":"age",
- "formatInfo":{
- "type":"int"
- }
- }
- }
- ],
- "topic":"YOUR_TOPIC",
- "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
- "format":{
- "type":"jsonFormat",
- "failOnMissingField":false,
- "ignoreParseErrors":true,
- "timestampFormatStandard":"SQL",
- "mapNullKeyMode":"DROP",
- "mapNullKeyLiteral":"null",
- "encodeDecimalAsPlainNumber":true
- }
- }
- ],
- "relations":[
- {
- "type":"innerJoin",
- "inputs":[
- "1",
- "2"
- ],
- "outputs":[
- "3"
- ],
- "joinConditionMap":{
- "2":[
- {
- "type":"singleValueFilter",
- "logicOperator":{
- "type":"empty"
- },
- "source":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- },
- "nodeId":"1"
- },
- "compareOperator":{
- "type":"equal"
- },
- "target":{
- "type":"base",
- "name":"id",
- "formatInfo":{
- "type":"long"
- },
- "nodeId":"2"
- }
- }
- ]
- }
- },
- {
- "type":"baseRelation",
- "inputs":[
- "3"
- ],
- "outputs":[
- "4"
- ]
- }
- ]
- }
- ]
-}
-```
\ No newline at end of file
+TODO: 未来发布的版本将会支持。