You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@hudi.apache.org by vtygoss <vt...@126.com> on 2021/12/16 09:26:44 UTC

Re: using flink retract stream and rockdb, too many intermediateresult of values cause checkpoint too heavy to finish

Hi  Arvid Heise,


Thanks for your reply! It's not classical sensor aggregation.  


The reason for not using window join is the very long time gap between patient's behaviors. 


There is a long gap of days even months between the appointment of doctor and the visit, and between tests and between hospitalization and discharge. It's a little like a specail session window having a very long gap, but it won't be a time or number based window. 


> actual use case? 
The actual use cases are based on this scenario, like doctors, patients, orders, visits, tests, hospitalization, nursing notes and so on. 
> What do i want to acheive? 
As mentioned above, during a long time zone, dozens of events continue to arrive for each patients, especally testing and nursing records. I hope that when the new record comes, the old result will be updated automatically. And i also hope the delay of the retraction and the re-sendition can be within 10 minutes. 
> consumers of the produced dataset?
Data developers will build a data streaming production pipeline based on upstream datasets and produce new datasets; Data analysts will analyse data and model like the relationship between spending cost and medical outcomes; Doctor and nurse on duty will query all info of corresponding patient.   


Thanks for your any reply or suggestion. 


Best Regards!
2021-12-16 17:25:00


在 2021年12月16日 04:09,Arvid Heise<ar...@apache.org> 写道:


Can you please describe your actual use case? What do you want to achieve low-latency or high-throughput? What are the consumers of the produced dataset?



It sounds to me as if this is classical sensor aggregation. I have not heard of any sensor aggregation that doesn't use windowing. So you'd usually include a TUMBLE window of 10s and output the data in small batches. This would significantly reduce the pressure on the sink and may already solve some of your problems.



On Tue, Dec 14, 2021 at 4:29 AM Caizhi Weng <ts...@gmail.com> wrote:

Hi!


Changes of input tables will cause corresponding changes in output table


Which sink are you using? If it is an upsert sink then Flink SQL planner will filter out UPDATE_BEFORE messages automatically. Also if your sink supports something like "ignore delete messages" it can also filter out delete messages and affect the downstream less.


Mini-batch will also help in this case. If mini-batch is enabled, aggregations will only send updates to the downstream once per batch, thus decreasing the number of records flowing to downstream.


For better performance on aggregations you can also try local-global aggregations. See [1] for details.


Row-Based Storage


This depends on the format you use. Although Flink's current calculation model is row-based, it still supports column-based format like parquet and has a number of optimizations on it. If you enable mini-batch and two-staged aggregations most job will meet their performance needs.


[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#local-global-aggregation


vtygoss <vt...@126.com> 于2021年12月13日周一 17:13写道:

Hi, community!


I meet a problem in the procedure of building a streaming production pipeline using Flink retract stream and hudi-hdfs/kafka as storage engine and rocksdb as statebackend. 


In my scenario, 
- During a patient's hospitalization, multiple measurements of vital signs are recorded, including temperature, pulse, blood pressure and so on. 
- Each type of vital sign contains 20+ or more records with PRIMARY KEY(patientId, visitId, signType, time) in table tbl_vis_vital_signs mentioned in below code. 


And, i need to get all the vital sign records aggregations together through JOIN or COLLECT with FILTER, as code below. 


```
select pid, vid, 
collect(ROW(..., temperature,...)) filter(where signType='temprature') as temprature,
collect(ROW(..., pulse,..))filter(where signType='pulse') as pulse,
collect(....) filter(where ...) as bloodpressure
from tbl_vis_vital_signs 
group by pid, vid
```


With the help of FlinkCDC and Kafka/Hudi-Hdfs, we want to build streaming production pipeline, as the data flow below. 


DataBase    --[CDC tools]-->   Kafka     --[sync]-->     Dynamic Table(kafka/hudi-hdfs)  --Flink SQL(retract stream) --> Dynamic Table 


The problem is contributed by three factors as following. 
1. Data Inflations:
1) major: Changes of input tables will cause corresponding changes in output table, e.g. join, aggregation. In the code above, every change of each row in tbl_vis_vital_signs will retract the out-dated result full of all vital signs' info and send new result. More serious, there are many vital sign records during per hospitalization, and cause too many times of retract and re-send operations which will be consumed by all downstreams.
2) minor: Each cdc update event will be split in two event: deletion of old record and insertion of new record. 
2. Kafka / Hudi-HDFS / RocksDB Append incremental data to full data: 
1) RocksDB and Hudi-HDFS use incremental model like LSM, they append incremental events to full, no matter insertion or deletion.
2) Even upsert-kafka, is implemented by inserting tombstones. 
3. Row-Based Storage


In my scenario, these factors will cause problems: 
1. A large number of low meaning intermediate results of PrimaryKey consume throughput of Flink Application. 
2. Heavy checkpoint: In every checkpoint(aligned, every 10 sec), the incremental block data of rocksdb is over a few of GB, and it takes over a few minutes if succussfully. But only a few GB data exists in HDFS checkpoint directory. 
3. Low performance of application and low stablity of TaskManager JVM. 


So, does mini-batch have an improvement of this scenario? 
Thanks for your any reply or suggestions.


Best Regards!


2021-12-13 17:10:00