You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Zhipeng Zhang (Jira)" <ji...@apache.org> on 2023/04/24 06:48:00 UTC

[jira] [Updated] (FLINK-31901) AbstractBroadcastWrapperOperator should not block checkpoint barriers when processing cached records

     [ https://issues.apache.org/jira/browse/FLINK-31901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Zhipeng Zhang updated FLINK-31901:
----------------------------------
    Description: 
Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast input until the broadcast inputs are all processed. After the broadcast variables are ready, we first process the cached records and then continue to process the newly arrived records.

 

Processing cached elements is invoked via `Input#processElement` and `Input#processWatermark`.  However, processing cached element may take a long time since there may be many cached records, which could potentially block the checkpoint barrier.

 

If we run the code snippet here[1], we are supposed to get logs as follows.
{code:java}
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2
processed cached records, cnt: 10000
processed cached records, cnt: 20000
processed cached records, cnt: 30000
processed cached records, cnt: 40000
processed cached records, cnt: 50000
processed cached records, cnt: 60000
processed cached records, cnt: 70000
processed cached records, cnt: 80000
processed cached records, cnt: 90000
processed cached records, cnt: 100000
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6
OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 {code}
 

We can find that from line#3 to line#12, there is no checkpoints and the barriers are blocked until all cached elements are processed.

 

  [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case

  was:
Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast input until the broadcast inputs are all processed. After the broadcast variables are ready, we first process the cached records and then continue to process the newly arrived records.

 

Processing cached elements is invoked via `Input#processElement` and `Input#processWatermark`.  However, processing cached element may take a long time since there may be many cached records, which could potentially block the checkpoint barrier.


> AbstractBroadcastWrapperOperator should not block checkpoint barriers when processing cached records
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-31901
>                 URL: https://issues.apache.org/jira/browse/FLINK-31901
>             Project: Flink
>          Issue Type: Improvement
>          Components: Library / Machine Learning
>            Reporter: Zhipeng Zhang
>            Priority: Major
>             Fix For: ml-2.3.0
>
>
> Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast input until the broadcast inputs are all processed. After the broadcast variables are ready, we first process the cached records and then continue to process the newly arrived records.
>  
> Processing cached elements is invoked via `Input#processElement` and `Input#processWatermark`.  However, processing cached element may take a long time since there may be many cached records, which could potentially block the checkpoint barrier.
>  
> If we run the code snippet here[1], we are supposed to get logs as follows.
> {code:java}
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2
> processed cached records, cnt: 10000
> processed cached records, cnt: 20000
> processed cached records, cnt: 30000
> processed cached records, cnt: 40000
> processed cached records, cnt: 50000
> processed cached records, cnt: 60000
> processed cached records, cnt: 70000
> processed cached records, cnt: 80000
> processed cached records, cnt: 90000
> processed cached records, cnt: 100000
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 {code}
>  
> We can find that from line#3 to line#12, there is no checkpoints and the barriers are blocked until all cached elements are processed.
>  
>   [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case



--
This message was sent by Atlassian Jira
(v8.20.10#820010)