You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Carl (Jira)" <ji...@apache.org> on 2022/02/14 09:41:00 UTC

[jira] [Created] (FLINK-26123) The data of the upsert-kafka source cannot be written to HBase under sql where conditions

Carl created FLINK-26123:
----------------------------

             Summary: The data of the upsert-kafka source cannot be written to HBase under sql where conditions
                 Key: FLINK-26123
                 URL: https://issues.apache.org/jira/browse/FLINK-26123
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.12.1
            Reporter: Carl
         Attachments: image-2022-02-14-16-43-13-172.png, image-2022-02-14-16-47-26-820.png, image-2022-02-14-17-14-43-158.png, image-2022-02-14-17-15-27-611.png, image-2022-02-14-17-18-23-864.png, image-2022-02-14-17-20-04-228.png, image-2022-02-14-17-30-02-525.png, image-2022-02-14-17-32-40-475.png

*1. source table :*

*(1) kafka topic :*

./kafka-topics.sh -create -zookeeper kafka01:2181,kafka02:2181,kafka03:2181 -replication-factor 2 -partitions 2 -topic sink-hbase-where-01

*(2) flink kafka table :*

create table hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source(
    id         string
    , ver      int
    , dp_last_chg_time    timestamp(3)
    , kafka_ts            timestamp(3) METADATA FROM 'timestamp' VIRTUAL
    , load_ts             AS PROCTIME()
    , ts                  as dp_last_chg_time
    , WATERMARK FOR dp_last_chg_time AS dp_last_chg_time - INTERVAL '20' SECOND
    , PRIMARY KEY (id) not enforced
)WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'sink-hbase-where-01',
    'properties.group.id' = 'sink-hbase-where-group-01',
    'properties.zookeeper.connect' = '...',
    'properties.bootstrap.servers' = '...',
    'key.format' = 'json',
    'key.json.ignore-parse-errors' = 'true',
    'value.format' = 'json',
    'value.json.ignore-parse-errors' = 'true',
    'value.fields-include' = 'ALL'
);

 

*2. sink table :*

CREATE TABLE hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink 
(
    `pk` STRING
    , info1 ROW<ver string, dp_last_chg_time string, kafka_ts string, load_ts string>
    , PRIMARY KEY (`pk`) NOT ENFORCED
) WITH (
    'connector' = 'hbase-2.2',
    'table-name' = 'sink-hbase-where',
    'sink.buffer-flush.max-size' = '0',
    'sink.buffer-flush.max-rows' ='0',
    'zookeeper.quorum' = '...'
);

 

*3. flink sql :* 

insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink
select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts)
from (
    select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts
    from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1
    *where cast(t1.id as string) = '555'*
)

 

*4. test*

*test 1 :*

(1) produce kafka message:

{"id":"{*}555{*}"},\{"id":"555","ver":1,"dp_last_chg_time":"2022-02-14 12:04:00"}

(2) scan hbase table :

!image-2022-02-14-17-14-43-158.png!

(3) produce kafka message:

{"id":"{*}555{*}"},{"id":"555","ver":{*}2{*},"dp_last_chg_time":"2022-02-14 12:04:00"}

(4) scan hbase table :

!image-2022-02-14-17-15-27-611.png!

(5) ** produce kafka message:

{"id":"{*}666{*}"},{"id":"666","ver":{*}1{*},"dp_last_chg_time":"2022-02-14 12:04:00"}

(6) scan hbase table :

!image-2022-02-14-17-18-23-864.png!

 

*test 2 :*

(1) cancel the flink app in idea

(2) truncate hbase table :

!image-2022-02-14-16-47-26-820.png!

(3) run the flink app in idea 

(4) scan hbase table :

No data was written to HBase.

!image-2022-02-14-17-20-04-228.png!

 

*test 3 :*

(1) cancel the flink app in idea

(2) Delete where condition :

insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink
select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts)
from (
    select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts
    from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1
    --{*}where cast(t1.id as string) = '555'{*}
)

(3) run the flink app in idea 

(4) scan hbase table :

the data was written to HBase :

!image-2022-02-14-17-30-02-525.png!

 

*test 4 :*

(1) cancel the flink app in idea

(2) update the where condition :

insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink
select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts)
from (
    select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts
    from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1
    --{*}where cast(t1.id as string) = '666'{*}
)

(3) run the flink app in idea 

(4) scan hbase table :

the data was written to HBase :

!image-2022-02-14-17-32-40-475.png!

 

From the test results,

(1) there are two pieces of data with a primary key of '555'

{"id":"555"},\{"id":"555","ver":1,"dp_last_chg_time":"2022-02-14 12:04:00"}

{"id":"555"},\{"id":"555","ver":2,"dp_last_chg_time":"2022-02-14 12:04:00"}

(2) there is one piece of data with a primary key of '666'

{"id":"666"},\{"id":"666","ver":1,"dp_last_chg_time":"2022-02-14 12:04:00"}

 

*re-run the flink sql in idea, the following conclusions are drawn*

(1) With the following SQL, both data can be written in

insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink
select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts)
from (
    select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts
    from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1
)

(2) With the following SQL, no data can be written in

insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink
select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts)
from (
    select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts
    from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1
    *where cast(t1.id as string) = '555'*
)

(2) With the following SQL, '666' data can be written in

insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink
select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts)
from (
    select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts
    from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1
    *where cast(t1.id as string) = '666'*
)

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)