You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/06/24 10:31:16 UTC
[incubator-seatunnel] branch dev updated: [Feature][Doc] Add Flink SQL module to website. #2021 (#2056)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c83a02b6b [Feature][Doc] Add Flink SQL module to website. #2021 (#2056)
c83a02b6b is described below
commit c83a02b6b146b00e3f6c99699019ea93dcf3a6af
Author: Wanghuan <im...@126.com>
AuthorDate: Fri Jun 24 18:31:10 2022 +0800
[Feature][Doc] Add Flink SQL module to website. #2021 (#2056)
* docs error
* [Feature][Doc] Add Flink SQL module to website. #2021
* [Feature][Doc] Add Flink SQL module to website. #2021
Co-authored-by: wanghuan2054 <49...@qq.com>
---
docs/en/connector/flink-sql/usage.md | 275 +++++++++++++++++++++++++++++++++++
docs/sidebars.js | 18 +++
2 files changed, 293 insertions(+)
diff --git a/docs/en/connector/flink-sql/usage.md b/docs/en/connector/flink-sql/usage.md
new file mode 100644
index 000000000..3ffb93d9f
--- /dev/null
+++ b/docs/en/connector/flink-sql/usage.md
@@ -0,0 +1,275 @@
+# How to use flink sql module
+
+## Usage
+
+### 1. Command Entrypoint
+
+```bash
+bin/start-seatunnel-sql.sh
+```
+
+### 2. seatunnel config
+
+Change the file flink.sql.conf.template in the config/ directory to flink.sql.conf
+
+```bash
+mv flink.sql.conf.template flink.sql.conf
+```
+
+Prepare a seatunnel config file with the following content:
+
+```sql
+SET table.dml-sync = true;
+
+CREATE TABLE events (
+ f_type INT,
+ f_uid INT,
+ ts AS localtimestamp,
+ WATERMARK FOR ts AS ts
+) WITH (
+ 'connector' = 'datagen',
+ 'rows-per-second'='5',
+ 'fields.f_type.min'='1',
+ 'fields.f_type.max'='5',
+ 'fields.f_uid.min'='1',
+ 'fields.f_uid.max'='1000'
+);
+
+CREATE TABLE print_table (
+ type INT,
+ uid INT,
+ lstmt TIMESTAMP
+) WITH (
+ 'connector' = 'print',
+ 'sink.parallelism' = '1'
+);
+
+INSERT INTO print_table SELECT * FROM events where f_type = 1;
+```
+
+### 3. run job
+
+#### Standalone Cluster
+
+```bash
+bin/start-seatunnel-sql.sh --config config/flink.sql.conf
+
+# -p 2 specifies that the parallelism of flink job is 2. You can also specify more parameters, use flink run -h to view
+bin/start-seatunnel-flink.sh \
+-p 2 \
+--config config/flink.sql.conf
+```
+
+#### Yarn Cluster
+
+```bash
+bin/start-seatunnel-sql.sh -m yarn-cluster --config config/flink.sql.conf
+
+bin/start-seatunnel-sql.sh -t yarn-per-job --config config/flink.sql.conf
+
+# -p 2 specifies that the parallelism of flink job is 2. You can also specify more parameters, use flink run -h to view
+bin/start-seatunnel-flink.sh \
+-p 2 \
+-m yarn-cluster \
+--config config/flink.sql.conf
+```
+
+#### Other Options
+
+* `-p 2` specifies that the job parallelism is `2`
+
+```bash
+bin/start-seatunnel-sql.sh -p 2 --config config/flink.sql.conf
+```
+
+## Example
+
+1. How to implement flink sql interval join with seatunnel flink-sql module
+
+intervaljoin.sql.conf
+
+```hocon
+CREATE TABLE basic (
+ `id` BIGINT,
+ `name` STRING,
+ `ts` STRING
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'basic',
+ 'properties.bootstrap.servers' = 'XX.XX.XX.XX:9092',
+ 'properties.group.id' = 'testGroup',
+ 'scan.startup.mode' = 'latest-offset',
+ 'format' = 'json'
+);
+
+CREATE TABLE infos (
+ `id` BIGINT,
+ `age` BIGINT,
+ `ts` STRING
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'info',
+ 'properties.bootstrap.servers' = 'XX.XX.XX.XX:9092',
+ 'properties.group.id' = 'testGroup',
+ 'scan.startup.mode' = 'latest-offset',
+ 'format' = 'json'
+);
+
+CREATE TABLE stream2_join_result (
+ id BIGINT ,
+ name STRING,
+ age BIGINT,
+ ts1 STRING ,
+ ts2 STRING,
+ PRIMARY KEY(id) NOT ENFORCED
+) WITH (
+ 'connector' = 'jdbc',
+ 'url' = 'jdbc:mysql://XX.XX.XX.XX:3306/testDB',
+ 'username' = 'root',
+ 'password' = 'taia@2021',
+ 'table-name' = 'stream2_join_result'
+);
+
+insert into stream2_join_result select basic.id, basic.name, infos.age,basic.ts,infos.ts
+from basic join infos on (basic.id = infos.id) where TO_TIMESTAMP(basic.ts,'yyyy-MM-dd HH:mm:ss')
+BETWEEN TO_TIMESTAMP(infos.ts,'yyyy-MM-dd HH:mm:ss') - INTERVAL '10' SECOND AND TO_TIMESTAMP(infos.ts,'yyyy-MM-dd HH:mm:ss') + INTERVAL '10' SECOND;
+```
+
+```bash
+bin/start-seatunnel-sql.sh -m yarn-cluster --config config/intervaljoin.sql.conf
+```
+
+2. How to implement flink sql dim join (using mysql) with seatunnel flink-sql module
+
+dimjoin.sql.conf
+
+```hocon
+CREATE TABLE code_set_street (
+ area_code STRING,
+ area_name STRING,
+ town_code STRING ,
+ town_name STRING ,
+ PRIMARY KEY(town_code) NOT ENFORCED
+) WITH (
+ 'connector' = 'jdbc',
+ 'url' = 'jdbc:mysql://XX.XX.XX.XX:3306/testDB',
+ 'username' = 'root',
+ 'password' = '2021',
+ 'table-name' = 'code_set_street',
+ 'lookup.cache.max-rows' = '5000' ,
+ 'lookup.cache.ttl' = '5min'
+);
+
+CREATE TABLE people (
+ `id` STRING,
+ `name` STRING,
+ `ts` TimeStamp(3) ,
+ proctime AS PROCTIME()
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'people',
+ 'properties.bootstrap.servers' = 'XX.XX.XX.XX:9092',
+ 'properties.group.id' = 'testGroup',
+ 'scan.startup.mode' = 'latest-offset',
+ 'format' = 'json'
+);
+
+CREATE TABLE mysql_dim_join_result (
+ id STRING ,
+ name STRING,
+ area_name STRING,
+ town_code STRING ,
+ town_name STRING,
+ ts TimeStamp ,
+ PRIMARY KEY(id,town_code) NOT ENFORCED
+) WITH (
+ 'connector' = 'jdbc',
+ 'url' = 'jdbc:mysql://XX.XX.XX.XX:3306/testDB',
+ 'username' = 'root',
+ 'password' = '2021',
+ 'table-name' = 'mysql_dim_join_result'
+);
+
+insert into mysql_dim_join_result
+select people.id , people.name ,code_set_street.area_name ,code_set_street.town_code, code_set_street.town_name , people.ts
+from people inner join code_set_street FOR SYSTEM_TIME AS OF people.proctime
+on (people.id = code_set_street.town_code);
+```
+
+```bash
+bin/start-seatunnel-sql.sh -m yarn-cluster --config config/dimjoin.sql.conf
+```
+
+3. How to implement flink SQL cdc dim join (using mysql-cdc) with seatunnel flink-sql module
+
+##### First , Need create mysql table in mysql database
+
+```
+CREATE TABLE `dim_cdc_join_result` (
+ `id` varchar(255) NOT NULL,
+ `name` varchar(255) DEFAULT NULL,
+ `area_name` varchar(255) NOT NULL,
+ `town_code` varchar(255) NOT NULL,
+ `town_name` varchar(255) DEFAULT NULL,
+ `ts` varchar(255) DEFAULT NULL,
+ PRIMARY KEY (`id`,`town_code`,`ts`) USING BTREE
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT;
+```
+
+cdcjoin.sql.conf
+
+```hocon
+CREATE TABLE code_set_street_cdc (
+ area_code STRING,
+ area_name STRING,
+ town_code STRING ,
+ town_name STRING ,
+ PRIMARY KEY(town_code) NOT ENFORCED
+) WITH (
+ 'connector' = 'mysql-cdc',
+ 'hostname' = 'XX.XX.XX.XX',
+ 'port' = '3306',
+ 'username' = 'root',
+ 'password' = '2021',
+ 'database-name' = 'flink',
+ 'table-name' = 'code_set_street'
+);
+
+CREATE TABLE people (
+ `id` STRING,
+ `name` STRING,
+ `ts` STRING
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'people',
+ 'properties.bootstrap.servers' = 'XX.XX.XX.XX:9092',
+ 'properties.group.id' = 'testGroup',
+ 'scan.startup.mode' = 'latest-offset',
+ 'format' = 'json'
+);
+
+# create mysql sink table in flink
+CREATE TABLE dim_cdc_join_result (
+ id STRING ,
+ name STRING,
+ area_name STRING,
+ town_code STRING ,
+ town_name STRING,
+ ts STRING ,
+ PRIMARY KEY(id,town_code) NOT ENFORCED
+) WITH (
+ 'connector' = 'jdbc',
+ 'url' = 'jdbc:mysql://XX.XX.XX.XX:3306/flink',
+ 'username' = 'root',
+ 'password' = '2021',
+ 'table-name' = 'dim_cdc_join_result'
+);
+
+insert into dim_cdc_join_result
+select a.id , a.name ,b.area_name ,b.town_code, b.town_name , a.ts
+from people a inner join code_set_street_cdc b on (a.id = b.town_code);
+```
+
+```bash
+bin/start-seatunnel-sql.sh -m yarn-cluster --config config/cdcjoin.sql.conf
+```
\ No newline at end of file
diff --git a/docs/sidebars.js b/docs/sidebars.js
index 170874ed2..ef8c73158 100644
--- a/docs/sidebars.js
+++ b/docs/sidebars.js
@@ -118,6 +118,24 @@ const sidebars = {
},
],
},
+ {
+ type: 'category',
+ label: 'flink-sql',
+ link: {
+ type: 'generated-index',
+ title: 'Flink-sql of SeaTunnel',
+ description: 'List all flink-sql supported Apache SeaTunnel for now.',
+ slug: '/category/flink-sql',
+ keywords: ['flink-sql'],
+ image: '/img/favicon.ico',
+ },
+ items: [
+ {
+ type: 'autogenerated',
+ dirName: 'connector/flink-sql',
+ },
+ ],
+ },
],
},
{