You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by vtygoss <vt...@126.com> on 2021/12/13 09:10:50 UTC

using flink retract stream and rockdb, too many intermediate result of values cause checkpoint too heavy to finish

Hi, community!


I meet a problem in the procedure of building a streaming production pipeline using Flink retract stream and hudi-hdfs/kafka as storage engine and rocksdb as statebackend. 


In my scenario, 
- During a patient's hospitalization, multiple measurements of vital signs are recorded, including temperature, pulse, blood pressure and so on. 
- Each type of vital sign contains 20+ or more records with PRIMARY KEY(patientId, visitId, signType, time) in table tbl_vis_vital_signs mentioned in below code. 


And, i need to get all the vital sign records aggregations together through JOIN or COLLECT with FILTER, as code below. 


```
select pid, vid, 
collect(ROW(..., temperature,...)) filter(where signType='temprature') as temprature,
collect(ROW(..., pulse,..))filter(where signType='pulse') as pulse,
collect(....) filter(where ...) as bloodpressure
from tbl_vis_vital_signs 
group by pid, vid
```


With the help of FlinkCDC and Kafka/Hudi-Hdfs, we want to build streaming production pipeline, as the data flow below. 


DataBase    --[CDC tools]-->   Kafka     --[sync]-->     Dynamic Table(kafka/hudi-hdfs)  --Flink SQL(retract stream) --> Dynamic Table 


The problem is contributed by three factors as following. 
1. Data Inflations:
1) major: Changes of input tables will cause corresponding changes in output table, e.g. join, aggregation. In the code above, every change of each row in tbl_vis_vital_signs will retract the out-dated result full of all vital signs' info and send new result. More serious, there are many vital sign records during per hospitalization, and cause too many times of retract and re-send operations which will be consumed by all downstreams.
2) minor: Each cdc update event will be split in two event: deletion of old record and insertion of new record. 
2. Kafka / Hudi-HDFS / RocksDB Append incremental data to full data: 
1) RocksDB and Hudi-HDFS use incremental model like LSM, they append incremental events to full, no matter insertion or deletion.
2) Even upsert-kafka, is implemented by inserting tombstones. 
3. Row-Based Storage


In my scenario, these factors will cause problems: 
1. A large number of low meaning intermediate results of PrimaryKey consume throughput of Flink Application. 
2. Heavy checkpoint: In every checkpoint(aligned, every 10 sec), the incremental block data of rocksdb is over a few of GB, and it takes over a few minutes if succussfully. But only a few GB data exists in HDFS checkpoint directory. 
3. Low performance of application and low stablity of TaskManager JVM. 


So, does mini-batch have an improvement of this scenario? 
Thanks for your any reply or suggestions.


Best Regards!


2021-12-13 17:10:00

Re: using flink retract stream and rockdb, too many intermediate result of values cause checkpoint too heavy to finish

Posted by Arvid Heise <ar...@apache.org>.
Can you please describe your actual use case? What do you want to achieve
low-latency or high-throughput? What are the consumers of the produced
dataset?

It sounds to me as if this is classical sensor aggregation. I have not
heard of any sensor aggregation that doesn't use windowing. So you'd
usually include a TUMBLE window of 10s and output the data in small
batches. This would significantly reduce the pressure on the sink and may
already solve some of your problems.

On Tue, Dec 14, 2021 at 4:29 AM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> Changes of input tables will cause corresponding changes in output table
>
>
> Which sink are you using? If it is an upsert sink then Flink SQL planner
> will filter out UPDATE_BEFORE messages automatically. Also if your sink
> supports something like "ignore delete messages" it can also filter out
> delete messages and affect the downstream less.
>
> Mini-batch will also help in this case. If mini-batch is enabled,
> aggregations will only send updates to the downstream once per batch, thus
> decreasing the number of records flowing to downstream.
>
> For better performance on aggregations you can also try local-global
> aggregations. See [1] for details.
>
> Row-Based Storage
>
>
> This depends on the format you use. Although Flink's current calculation
> model is row-based, it still supports column-based format like parquet and
> has a number of optimizations on it. If you enable mini-batch and
> two-staged aggregations most job will meet their performance needs.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#local-global-aggregation
>
> vtygoss <vt...@126.com> 于2021年12月13日周一 17:13写道:
>
>> Hi, community!
>>
>>
>> I meet a problem in the procedure of building a streaming production
>> pipeline using Flink retract stream and hudi-hdfs/kafka as storage engine
>> and rocksdb as statebackend.
>>
>>
>> In my scenario,
>>
>> - During a patient's hospitalization, multiple measurements of vital
>> signs are recorded, including temperature, pulse, blood pressure and so on.
>>
>> - Each type of vital sign contains 20+ or more records with PRIMARY
>> KEY(patientId, visitId, signType, time) in table tbl_vis_vital_signs
>> mentioned in below code.
>>
>>
>> And, i need to get all the vital sign records aggregations together
>> through JOIN or COLLECT with FILTER, as code below.
>>
>>
>> ```
>>
>> select pid, vid,
>>
>> collect(ROW(..., temperature,...)) filter(where signType='temprature') as
>> temprature,
>>
>> collect(ROW(..., pulse,..))filter(where signType='pulse') as pulse,
>>
>> collect(....) filter(where ...) as bloodpressure
>>
>> from tbl_vis_vital_signs
>>
>> group by pid, vid
>>
>> ```
>>
>>
>> With the help of FlinkCDC and Kafka/Hudi-Hdfs, we want to build streaming
>> production pipeline, as the data flow below.
>>
>>
>> DataBase    --[CDC tools]-->   Kafka     --[sync]-->     Dynamic
>> Table(kafka/hudi-hdfs)  --Flink SQL(retract stream) --> Dynamic Table
>>
>>
>> The problem is contributed by three factors as following.
>>
>> 1. Data Inflations:
>>
>> 1) major: Changes of input tables will cause corresponding changes in
>> output table, e.g. join, aggregation. In the code above, every change of
>> each row in tbl_vis_vital_signs will retract the out-dated result full of
>> all vital signs' info and send new result. More serious, there are many
>> vital sign records during per hospitalization, and cause too many times of
>> retract and re-send operations which will be consumed by all downstreams.
>>
>> 2) minor: Each cdc update event will be split in two event: deletion of
>> old record and insertion of new record.
>>
>> 2. Kafka / Hudi-HDFS / RocksDB Append incremental data to full data:
>>
>> 1) RocksDB and Hudi-HDFS use incremental model like LSM, they append
>> incremental events to full, no matter insertion or deletion.
>>
>> 2) Even upsert-kafka, is implemented by inserting tombstones.
>>
>> 3. Row-Based Storage
>>
>>
>> In my scenario, these factors will cause problems:
>>
>> 1. A large number of low meaning intermediate results of PrimaryKey
>> consume throughput of Flink Application.
>>
>> 2. Heavy checkpoint: In every checkpoint(aligned, every 10 sec),
>> the incremental block data of rocksdb is over a few of GB, and it takes
>> over a few minutes if succussfully. But only a few GB data exists in HDFS
>> checkpoint directory.
>>
>> 3. Low performance of application and low stablity of TaskManager JVM.
>>
>>
>> So, does mini-batch have an improvement of this scenario?
>>
>> Thanks for your any reply or suggestions.
>>
>>
>> Best Regards!
>>
>>
>> 2021-12-13 17:10:00
>>
>>
>>
>>
>>

Re: using flink retract stream and rockdb, too many intermediate result of values cause checkpoint too heavy to finish

Posted by Arvid Heise <ar...@apache.org>.
Can you please describe your actual use case? What do you want to achieve
low-latency or high-throughput? What are the consumers of the produced
dataset?

It sounds to me as if this is classical sensor aggregation. I have not
heard of any sensor aggregation that doesn't use windowing. So you'd
usually include a TUMBLE window of 10s and output the data in small
batches. This would significantly reduce the pressure on the sink and may
already solve some of your problems.

On Tue, Dec 14, 2021 at 4:29 AM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> Changes of input tables will cause corresponding changes in output table
>
>
> Which sink are you using? If it is an upsert sink then Flink SQL planner
> will filter out UPDATE_BEFORE messages automatically. Also if your sink
> supports something like "ignore delete messages" it can also filter out
> delete messages and affect the downstream less.
>
> Mini-batch will also help in this case. If mini-batch is enabled,
> aggregations will only send updates to the downstream once per batch, thus
> decreasing the number of records flowing to downstream.
>
> For better performance on aggregations you can also try local-global
> aggregations. See [1] for details.
>
> Row-Based Storage
>
>
> This depends on the format you use. Although Flink's current calculation
> model is row-based, it still supports column-based format like parquet and
> has a number of optimizations on it. If you enable mini-batch and
> two-staged aggregations most job will meet their performance needs.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#local-global-aggregation
>
> vtygoss <vt...@126.com> 于2021年12月13日周一 17:13写道:
>
>> Hi, community!
>>
>>
>> I meet a problem in the procedure of building a streaming production
>> pipeline using Flink retract stream and hudi-hdfs/kafka as storage engine
>> and rocksdb as statebackend.
>>
>>
>> In my scenario,
>>
>> - During a patient's hospitalization, multiple measurements of vital
>> signs are recorded, including temperature, pulse, blood pressure and so on.
>>
>> - Each type of vital sign contains 20+ or more records with PRIMARY
>> KEY(patientId, visitId, signType, time) in table tbl_vis_vital_signs
>> mentioned in below code.
>>
>>
>> And, i need to get all the vital sign records aggregations together
>> through JOIN or COLLECT with FILTER, as code below.
>>
>>
>> ```
>>
>> select pid, vid,
>>
>> collect(ROW(..., temperature,...)) filter(where signType='temprature') as
>> temprature,
>>
>> collect(ROW(..., pulse,..))filter(where signType='pulse') as pulse,
>>
>> collect(....) filter(where ...) as bloodpressure
>>
>> from tbl_vis_vital_signs
>>
>> group by pid, vid
>>
>> ```
>>
>>
>> With the help of FlinkCDC and Kafka/Hudi-Hdfs, we want to build streaming
>> production pipeline, as the data flow below.
>>
>>
>> DataBase    --[CDC tools]-->   Kafka     --[sync]-->     Dynamic
>> Table(kafka/hudi-hdfs)  --Flink SQL(retract stream) --> Dynamic Table
>>
>>
>> The problem is contributed by three factors as following.
>>
>> 1. Data Inflations:
>>
>> 1) major: Changes of input tables will cause corresponding changes in
>> output table, e.g. join, aggregation. In the code above, every change of
>> each row in tbl_vis_vital_signs will retract the out-dated result full of
>> all vital signs' info and send new result. More serious, there are many
>> vital sign records during per hospitalization, and cause too many times of
>> retract and re-send operations which will be consumed by all downstreams.
>>
>> 2) minor: Each cdc update event will be split in two event: deletion of
>> old record and insertion of new record.
>>
>> 2. Kafka / Hudi-HDFS / RocksDB Append incremental data to full data:
>>
>> 1) RocksDB and Hudi-HDFS use incremental model like LSM, they append
>> incremental events to full, no matter insertion or deletion.
>>
>> 2) Even upsert-kafka, is implemented by inserting tombstones.
>>
>> 3. Row-Based Storage
>>
>>
>> In my scenario, these factors will cause problems:
>>
>> 1. A large number of low meaning intermediate results of PrimaryKey
>> consume throughput of Flink Application.
>>
>> 2. Heavy checkpoint: In every checkpoint(aligned, every 10 sec),
>> the incremental block data of rocksdb is over a few of GB, and it takes
>> over a few minutes if succussfully. But only a few GB data exists in HDFS
>> checkpoint directory.
>>
>> 3. Low performance of application and low stablity of TaskManager JVM.
>>
>>
>> So, does mini-batch have an improvement of this scenario?
>>
>> Thanks for your any reply or suggestions.
>>
>>
>> Best Regards!
>>
>>
>> 2021-12-13 17:10:00
>>
>>
>>
>>
>>

Re: using flink retract stream and rockdb, too many intermediate result of values cause checkpoint too heavy to finish

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

Changes of input tables will cause corresponding changes in output table


Which sink are you using? If it is an upsert sink then Flink SQL planner
will filter out UPDATE_BEFORE messages automatically. Also if your sink
supports something like "ignore delete messages" it can also filter out
delete messages and affect the downstream less.

Mini-batch will also help in this case. If mini-batch is enabled,
aggregations will only send updates to the downstream once per batch, thus
decreasing the number of records flowing to downstream.

For better performance on aggregations you can also try local-global
aggregations. See [1] for details.

Row-Based Storage


This depends on the format you use. Although Flink's current calculation
model is row-based, it still supports column-based format like parquet and
has a number of optimizations on it. If you enable mini-batch and
two-staged aggregations most job will meet their performance needs.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#local-global-aggregation

vtygoss <vt...@126.com> 于2021年12月13日周一 17:13写道:

> Hi, community!
>
>
> I meet a problem in the procedure of building a streaming production
> pipeline using Flink retract stream and hudi-hdfs/kafka as storage engine
> and rocksdb as statebackend.
>
>
> In my scenario,
>
> - During a patient's hospitalization, multiple measurements of vital signs
> are recorded, including temperature, pulse, blood pressure and so on.
>
> - Each type of vital sign contains 20+ or more records with PRIMARY
> KEY(patientId, visitId, signType, time) in table tbl_vis_vital_signs
> mentioned in below code.
>
>
> And, i need to get all the vital sign records aggregations together
> through JOIN or COLLECT with FILTER, as code below.
>
>
> ```
>
> select pid, vid,
>
> collect(ROW(..., temperature,...)) filter(where signType='temprature') as
> temprature,
>
> collect(ROW(..., pulse,..))filter(where signType='pulse') as pulse,
>
> collect(....) filter(where ...) as bloodpressure
>
> from tbl_vis_vital_signs
>
> group by pid, vid
>
> ```
>
>
> With the help of FlinkCDC and Kafka/Hudi-Hdfs, we want to build streaming
> production pipeline, as the data flow below.
>
>
> DataBase    --[CDC tools]-->   Kafka     --[sync]-->     Dynamic
> Table(kafka/hudi-hdfs)  --Flink SQL(retract stream) --> Dynamic Table
>
>
> The problem is contributed by three factors as following.
>
> 1. Data Inflations:
>
> 1) major: Changes of input tables will cause corresponding changes in
> output table, e.g. join, aggregation. In the code above, every change of
> each row in tbl_vis_vital_signs will retract the out-dated result full of
> all vital signs' info and send new result. More serious, there are many
> vital sign records during per hospitalization, and cause too many times of
> retract and re-send operations which will be consumed by all downstreams.
>
> 2) minor: Each cdc update event will be split in two event: deletion of
> old record and insertion of new record.
>
> 2. Kafka / Hudi-HDFS / RocksDB Append incremental data to full data:
>
> 1) RocksDB and Hudi-HDFS use incremental model like LSM, they append
> incremental events to full, no matter insertion or deletion.
>
> 2) Even upsert-kafka, is implemented by inserting tombstones.
>
> 3. Row-Based Storage
>
>
> In my scenario, these factors will cause problems:
>
> 1. A large number of low meaning intermediate results of PrimaryKey
> consume throughput of Flink Application.
>
> 2. Heavy checkpoint: In every checkpoint(aligned, every 10 sec),
> the incremental block data of rocksdb is over a few of GB, and it takes
> over a few minutes if succussfully. But only a few GB data exists in HDFS
> checkpoint directory.
>
> 3. Low performance of application and low stablity of TaskManager JVM.
>
>
> So, does mini-batch have an improvement of this scenario?
>
> Thanks for your any reply or suggestions.
>
>
> Best Regards!
>
>
> 2021-12-13 17:10:00
>
>
>
>
>