You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Carl (Jira)" <ji...@apache.org> on 2022/02/14 09:41:00 UTC
[jira] [Created] (FLINK-26123) The data of the upsert-kafka source cannot be written to HBase under sql where conditions
Carl created FLINK-26123:
----------------------------
Summary: The data of the upsert-kafka source cannot be written to HBase under sql where conditions
Key: FLINK-26123
URL: https://issues.apache.org/jira/browse/FLINK-26123
Project: Flink
Issue Type: Bug
Components: Table SQL / API
Affects Versions: 1.12.1
Reporter: Carl
Attachments: image-2022-02-14-16-43-13-172.png, image-2022-02-14-16-47-26-820.png, image-2022-02-14-17-14-43-158.png, image-2022-02-14-17-15-27-611.png, image-2022-02-14-17-18-23-864.png, image-2022-02-14-17-20-04-228.png, image-2022-02-14-17-30-02-525.png, image-2022-02-14-17-32-40-475.png
*1. source table :*
*(1) kafka topic :*
./kafka-topics.sh -create -zookeeper kafka01:2181,kafka02:2181,kafka03:2181 -replication-factor 2 -partitions 2 -topic sink-hbase-where-01
*(2) flink kafka table :*
create table hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source(
id string
, ver int
, dp_last_chg_time timestamp(3)
, kafka_ts timestamp(3) METADATA FROM 'timestamp' VIRTUAL
, load_ts AS PROCTIME()
, ts as dp_last_chg_time
, WATERMARK FOR dp_last_chg_time AS dp_last_chg_time - INTERVAL '20' SECOND
, PRIMARY KEY (id) not enforced
)WITH (
'connector' = 'upsert-kafka',
'topic' = 'sink-hbase-where-01',
'properties.group.id' = 'sink-hbase-where-group-01',
'properties.zookeeper.connect' = '...',
'properties.bootstrap.servers' = '...',
'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'value.format' = 'json',
'value.json.ignore-parse-errors' = 'true',
'value.fields-include' = 'ALL'
);
*2. sink table :*
CREATE TABLE hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink
(
`pk` STRING
, info1 ROW<ver string, dp_last_chg_time string, kafka_ts string, load_ts string>
, PRIMARY KEY (`pk`) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'sink-hbase-where',
'sink.buffer-flush.max-size' = '0',
'sink.buffer-flush.max-rows' ='0',
'zookeeper.quorum' = '...'
);
*3. flink sql :*
insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink
select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts)
from (
select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts
from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1
*where cast(t1.id as string) = '555'*
)
*4. test*
*test 1 :*
(1) produce kafka message:
{"id":"{*}555{*}"},\{"id":"555","ver":1,"dp_last_chg_time":"2022-02-14 12:04:00"}
(2) scan hbase table :
!image-2022-02-14-17-14-43-158.png!
(3) produce kafka message:
{"id":"{*}555{*}"},{"id":"555","ver":{*}2{*},"dp_last_chg_time":"2022-02-14 12:04:00"}
(4) scan hbase table :
!image-2022-02-14-17-15-27-611.png!
(5) ** produce kafka message:
{"id":"{*}666{*}"},{"id":"666","ver":{*}1{*},"dp_last_chg_time":"2022-02-14 12:04:00"}
(6) scan hbase table :
!image-2022-02-14-17-18-23-864.png!
*test 2 :*
(1) cancel the flink app in idea
(2) truncate hbase table :
!image-2022-02-14-16-47-26-820.png!
(3) run the flink app in idea
(4) scan hbase table :
No data was written to HBase.
!image-2022-02-14-17-20-04-228.png!
*test 3 :*
(1) cancel the flink app in idea
(2) Delete where condition :
insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink
select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts)
from (
select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts
from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1
--{*}where cast(t1.id as string) = '555'{*}
)
(3) run the flink app in idea
(4) scan hbase table :
the data was written to HBase :
!image-2022-02-14-17-30-02-525.png!
*test 4 :*
(1) cancel the flink app in idea
(2) update the where condition :
insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink
select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts)
from (
select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts
from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1
--{*}where cast(t1.id as string) = '666'{*}
)
(3) run the flink app in idea
(4) scan hbase table :
the data was written to HBase :
!image-2022-02-14-17-32-40-475.png!
From the test results,
(1) there are two pieces of data with a primary key of '555'
{"id":"555"},\{"id":"555","ver":1,"dp_last_chg_time":"2022-02-14 12:04:00"}
{"id":"555"},\{"id":"555","ver":2,"dp_last_chg_time":"2022-02-14 12:04:00"}
(2) there is one piece of data with a primary key of '666'
{"id":"666"},\{"id":"666","ver":1,"dp_last_chg_time":"2022-02-14 12:04:00"}
*re-run the flink sql in idea, the following conclusions are drawn*
(1) With the following SQL, both data can be written in
insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink
select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts)
from (
select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts
from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1
)
(2) With the following SQL, no data can be written in
insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink
select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts)
from (
select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts
from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1
*where cast(t1.id as string) = '555'*
)
(2) With the following SQL, '666' data can be written in
insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink
select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts)
from (
select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts
from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1
*where cast(t1.id as string) = '666'*
)
--
This message was sent by Atlassian Jira
(v8.20.1#820001)