You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/06/24 10:31:16 UTC

[incubator-seatunnel] branch dev updated: [Feature][Doc] Add Flink SQL module to website. #2021 (#2056)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c83a02b6b [Feature][Doc] Add Flink SQL module to website. #2021 (#2056)
c83a02b6b is described below

commit c83a02b6b146b00e3f6c99699019ea93dcf3a6af
Author: Wanghuan <im...@126.com>
AuthorDate: Fri Jun 24 18:31:10 2022 +0800

    [Feature][Doc] Add Flink SQL module to website. #2021 (#2056)
    
    * docs error
    
    * [Feature][Doc] Add Flink SQL module to website. #2021
    
    * [Feature][Doc] Add Flink SQL module to website. #2021
    
    Co-authored-by: wanghuan2054 <49...@qq.com>
---
 docs/en/connector/flink-sql/usage.md | 275 +++++++++++++++++++++++++++++++++++
 docs/sidebars.js                     |  18 +++
 2 files changed, 293 insertions(+)

diff --git a/docs/en/connector/flink-sql/usage.md b/docs/en/connector/flink-sql/usage.md
new file mode 100644
index 000000000..3ffb93d9f
--- /dev/null
+++ b/docs/en/connector/flink-sql/usage.md
@@ -0,0 +1,275 @@
+# How to use flink sql module
+
+## Usage
+
+### 1. Command Entrypoint
+
+```bash
+bin/start-seatunnel-sql.sh
+```
+
+### 2. seatunnel config
+
+Change the file flink.sql.conf.template in the config/ directory to flink.sql.conf
+
+```bash
+mv flink.sql.conf.template flink.sql.conf
+```
+
+Prepare a seatunnel config file with the following content:
+
+```sql
+SET table.dml-sync = true;
+
+CREATE TABLE events (
+  f_type INT,
+  f_uid INT,
+  ts AS localtimestamp,
+  WATERMARK FOR ts AS ts
+) WITH (
+  'connector' = 'datagen',
+  'rows-per-second'='5',
+  'fields.f_type.min'='1',
+  'fields.f_type.max'='5',
+  'fields.f_uid.min'='1',
+  'fields.f_uid.max'='1000'
+);
+
+CREATE TABLE print_table (
+  type INT,
+  uid INT,
+  lstmt TIMESTAMP
+) WITH (
+  'connector' = 'print',
+  'sink.parallelism' = '1'
+);
+
+INSERT INTO print_table SELECT * FROM events where f_type = 1;
+```
+
+### 3. run job
+
+#### Standalone Cluster
+
+```bash
+bin/start-seatunnel-sql.sh --config config/flink.sql.conf
+
+# -p 2 specifies that the parallelism of flink job is 2. You can also specify more parameters, use flink run -h to view
+bin/start-seatunnel-flink.sh \
+-p 2 \
+--config config/flink.sql.conf
+```
+
+#### Yarn Cluster
+
+```bash
+bin/start-seatunnel-sql.sh -m yarn-cluster --config config/flink.sql.conf
+
+bin/start-seatunnel-sql.sh -t yarn-per-job --config config/flink.sql.conf
+
+# -p 2 specifies that the parallelism of flink job is 2. You can also specify more parameters, use flink run -h to view
+bin/start-seatunnel-flink.sh \
+-p 2 \
+-m yarn-cluster \
+--config config/flink.sql.conf
+```
+
+#### Other Options
+
+* `-p 2` specifies that the job parallelism is `2`
+
+```bash
+bin/start-seatunnel-sql.sh -p 2 --config config/flink.sql.conf
+```
+
+## Example
+
+1. How to implement flink sql interval join with seatunnel flink-sql module
+
+intervaljoin.sql.conf
+
+```hocon
+CREATE TABLE basic (
+  `id` BIGINT,
+  `name` STRING,
+   `ts`  STRING
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'basic',
+  'properties.bootstrap.servers' = 'XX.XX.XX.XX:9092',
+  'properties.group.id' = 'testGroup',
+  'scan.startup.mode' = 'latest-offset',
+  'format' = 'json'
+);
+
+CREATE TABLE infos (
+  `id` BIGINT,
+  `age` BIGINT,
+   `ts`  STRING
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'info',
+  'properties.bootstrap.servers' = 'XX.XX.XX.XX:9092',
+  'properties.group.id' = 'testGroup',
+  'scan.startup.mode' = 'latest-offset',
+  'format' = 'json'
+);
+
+CREATE TABLE stream2_join_result (
+  id BIGINT , 
+  name STRING,
+  age BIGINT,
+  ts1 STRING , 
+  ts2 STRING,
+  PRIMARY KEY(id) NOT ENFORCED
+) WITH (
+  'connector' = 'jdbc',
+  'url' = 'jdbc:mysql://XX.XX.XX.XX:3306/testDB',
+  'username' = 'root',
+  'password' = 'taia@2021',
+  'table-name' = 'stream2_join_result'
+);
+
+insert into  stream2_join_result select basic.id, basic.name, infos.age,basic.ts,infos.ts 
+from basic join infos on (basic.id = infos.id) where  TO_TIMESTAMP(basic.ts,'yyyy-MM-dd HH:mm:ss') 
+BETWEEN   TO_TIMESTAMP(infos.ts,'yyyy-MM-dd HH:mm:ss')  - INTERVAL '10' SECOND AND  TO_TIMESTAMP(infos.ts,'yyyy-MM-dd HH:mm:ss') + INTERVAL '10' SECOND;
+```
+
+```bash
+bin/start-seatunnel-sql.sh -m yarn-cluster --config config/intervaljoin.sql.conf
+```
+
+2. How to implement flink sql dim join (using mysql) with seatunnel flink-sql module
+
+dimjoin.sql.conf
+
+```hocon
+CREATE TABLE code_set_street (
+  area_code STRING,
+  area_name STRING,
+  town_code STRING ,
+  town_name STRING ,
+  PRIMARY KEY(town_code) NOT ENFORCED
+) WITH (
+  'connector' = 'jdbc',
+  'url' = 'jdbc:mysql://XX.XX.XX.XX:3306/testDB',
+  'username' = 'root',
+  'password' = '2021',
+  'table-name' = 'code_set_street',
+  'lookup.cache.max-rows' = '5000' ,
+  'lookup.cache.ttl' = '5min'
+);
+
+CREATE TABLE people (
+  `id` STRING,
+  `name` STRING,
+  `ts`  TimeStamp(3) ,
+  proctime AS PROCTIME() 
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'people',
+  'properties.bootstrap.servers' = 'XX.XX.XX.XX:9092',
+  'properties.group.id' = 'testGroup',
+  'scan.startup.mode' = 'latest-offset',
+  'format' = 'json'
+);
+
+CREATE TABLE mysql_dim_join_result (
+  id STRING , 
+  name STRING,
+  area_name STRING,
+  town_code STRING , 
+  town_name STRING,
+  ts TimeStamp ,
+  PRIMARY KEY(id,town_code) NOT ENFORCED
+) WITH (
+  'connector' = 'jdbc',
+  'url' = 'jdbc:mysql://XX.XX.XX.XX:3306/testDB',
+  'username' = 'root',
+  'password' = '2021',
+  'table-name' = 'mysql_dim_join_result'
+);
+
+insert into mysql_dim_join_result
+select people.id , people.name ,code_set_street.area_name ,code_set_street.town_code, code_set_street.town_name , people.ts  
+from people inner join code_set_street FOR SYSTEM_TIME AS OF  people.proctime  
+on (people.id = code_set_street.town_code);
+```
+
+```bash
+bin/start-seatunnel-sql.sh -m yarn-cluster --config config/dimjoin.sql.conf
+```
+
+3. How to implement flink SQL cdc dim join (using mysql-cdc) with seatunnel flink-sql module
+
+##### First , Need create mysql table in mysql database
+
+```
+CREATE TABLE `dim_cdc_join_result` (
+    `id` varchar(255) NOT NULL,
+    `name` varchar(255) DEFAULT NULL,
+    `area_name` varchar(255) NOT NULL,
+    `town_code` varchar(255) NOT NULL,
+    `town_name` varchar(255) DEFAULT NULL,
+    `ts` varchar(255) DEFAULT NULL,
+    PRIMARY KEY (`id`,`town_code`,`ts`) USING BTREE
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT;
+```
+
+cdcjoin.sql.conf
+
+```hocon
+CREATE TABLE code_set_street_cdc (
+  area_code STRING,
+  area_name STRING,
+  town_code STRING ,
+  town_name STRING ,
+  PRIMARY KEY(town_code) NOT ENFORCED
+) WITH (
+  'connector' = 'mysql-cdc',
+  'hostname' = 'XX.XX.XX.XX',
+  'port' = '3306',
+  'username' = 'root',
+  'password' = '2021',
+  'database-name' = 'flink',
+  'table-name' = 'code_set_street'
+);
+     
+CREATE TABLE people (
+  `id` STRING,
+  `name` STRING,
+  `ts`  STRING
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'people',
+  'properties.bootstrap.servers' = 'XX.XX.XX.XX:9092',
+  'properties.group.id' = 'testGroup',
+  'scan.startup.mode' = 'latest-offset',
+  'format' = 'json'
+);
+
+# create mysql sink table in flink
+CREATE TABLE dim_cdc_join_result (
+  id STRING , 
+  name STRING,
+  area_name STRING,
+  town_code STRING , 
+  town_name STRING,
+  ts STRING ,
+  PRIMARY KEY(id,town_code) NOT ENFORCED
+) WITH (
+  'connector' = 'jdbc',
+  'url' = 'jdbc:mysql://XX.XX.XX.XX:3306/flink',
+  'username' = 'root',
+  'password' = '2021',
+  'table-name' = 'dim_cdc_join_result'
+);
+ 
+insert into dim_cdc_join_result
+select a.id , a.name ,b.area_name ,b.town_code, b.town_name , a.ts  
+from people a inner join code_set_street_cdc b  on (a.id = b.town_code);
+```
+
+```bash
+bin/start-seatunnel-sql.sh -m yarn-cluster --config config/cdcjoin.sql.conf
+```
\ No newline at end of file
diff --git a/docs/sidebars.js b/docs/sidebars.js
index 170874ed2..ef8c73158 100644
--- a/docs/sidebars.js
+++ b/docs/sidebars.js
@@ -118,6 +118,24 @@ const sidebars = {
             },
           ],
         },
+        {
+           type: 'category',
+           label: 'flink-sql',
+           link: {
+             type: 'generated-index',
+             title: 'Flink-sql of SeaTunnel',
+             description: 'List all flink-sql supported Apache SeaTunnel for now.',
+             slug: '/category/flink-sql',
+             keywords: ['flink-sql'],
+             image: '/img/favicon.ico',
+           },
+           items: [
+             {
+               type: 'autogenerated',
+               dirName: 'connector/flink-sql',
+             },
+           ],
+        },
       ],
     },
     {