You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Prasanna kumar <pr...@gmail.com> on 2022/02/02 12:52:22 UTC

Re: BroadCast State not loaded back into the job once its restarted due to a JM failure

Could some one guide me here ? It had been long since i raised this issue .
We are stuck here and none of the solutions are working .

Prasanna

On Fri, Jan 21, 2022 at 5:48 PM Prasanna kumar <
prasannakumarramani@gmail.com> wrote:

> Is there a mechanism where we can read checkpoint file to confirm whether
> the Broadcast state objects are properly serialized ?
> I have not seen any documentation on the same.
>
> Thanks,
> Prasanna.
>
> On Fri, Jan 14, 2022 at 3:55 PM Prasanna kumar <
> prasannakumarramani@gmail.com> wrote:
>
>> David,
>> I even cancelled the job manually with externalized checkpoint.
>>
>> Then restarted the job with same checkpoint. (I did not have any rules
>> file in the s3 location to be read by the file reader operator).  The job
>> started and ran. Still it did not load the backed up state data from
>> checkpoint file.
>>
>> Thanks,
>> Prasanna.
>>
>> On Thu, 13 Jan 2022, 19:05 Prasanna kumar, <pr...@gmail.com>
>> wrote:
>>
>>> David,
>>>
>>> *Instead, whatever has been saved into broadcast state in the
>>> processBroadcastElement method of your coProcessBroadcast function should
>>> be restored.*=>
>>>
>>> This is what i am expecting here and thats not happening.
>>>
>>> *To save data into broadcast state you do something like this:
>>>  ctx.getBroadcastState(descriptor).put(key, value);*
>>>
>>> This is being done in the following manner
>>>
>>> BroadcastState<String, ECState> broadcastState =
>>>     ctx.getBroadcastState(eCStateDescriptor);
>>> EventConfigState eCState = ECStateFactory.getEventConfigState(broadCastKey);
>>> if (!broadcastState.contains(broadCastKey)) {
>>>     broadcastState.put(broadCastKey, eCState);
>>> }
>>>
>>>
>>> Thanks
>>> Prasanna.
>>>
>>> On Thu, Jan 13, 2022 at 6:43 PM David Anderson <da...@apache.org>
>>> wrote:
>>>
>>>> You shouldn't expect data that was broadcast before the last checkpoint
>>>> to be rebroadcast after a restart. Instead, whatever has been saved into
>>>> broadcast state in the processBroadcastElement method of your
>>>> coProcessBroadcast function should be restored.
>>>>
>>>> To save data into broadcast state you do something like this:
>>>>
>>>>     ctx.getBroadcastState(descriptor).put(key, value);
>>>>
>>>> David
>>>>
>>>> On Thu, Jan 13, 2022 at 12:40 PM Prasanna kumar <
>>>> prasannakumarramani@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> FLINK version 1.12.7
>>>>> We have the following job.
>>>>> Source Read from Kafka apply broadcast rules and then write output to
>>>>> kafka.
>>>>> Rules are being read from S3 and written to broadcast state.
>>>>>
>>>>> When the Job gets restarted due to a JM failure.
>>>>> 1) The Following code (to read from the S3 bucket and populate to the
>>>>> broadcast state) does not read the contents as it reads when we start the
>>>>> job during submission or when the files in the bucket changes.
>>>>> *I did not see any configurations to make it read everytime it starts
>>>>> ? Is there a way to make this happen ?*
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *private DataStream<String> buildProducerControlStreamContinuously(
>>>>>  StreamExecutionEnvironment streamExecutionEnvironment, ParameterTool
>>>>> configParams) {    int s3PathMonitoringInterval =
>>>>> configParams.getInt(AppConstant.S3_PRODUCER_CONFIG_PATH_MONITORING_INTERVAL);
>>>>>   String s3ProducerConfigPath =
>>>>> configParams.get(AppConstant.S3_PRODUCER_CONFIG_PATH);    return
>>>>> streamExecutionEnvironment.readFile(new TextInputFormat(new
>>>>> Path(s3ProducerConfigPath)),        s3ProducerConfigPath,
>>>>>   FileProcessingMode.PROCESS_CONTINUOUSLY,
>>>>> s3PathMonitoringInterval);  }*
>>>>> 2) Even if the earlier option did not work , at least the checkpoint
>>>>> should have the broadcast state in picture, so the Job Manager/JobMaster
>>>>> should be loading it back to the job but it is not happening as we see post
>>>>> restart , the rules are not applied on the incoming kafka records and they
>>>>> go unprocessed.  I also checked the metadata file the classes of broadcast
>>>>> state , serializer are listed. though it's more machine specific, I did not
>>>>> understand a lot .
>>>>> *Isn't the loading of the state an expected behavior when restarting
>>>>> due to a failure?*
>>>>>
>>>>> Topology of the Job
>>>>>
>>>>> [image: Screen Shot 2022-01-13 at 4.52.03 PM.png]
>>>>>
>>>>> *Solution Tried.  *
>>>>> Added one more control stream where we have
>>>>> buildProducerControlStreamOnce Operator apart from
>>>>> buildProducerControlStreamContinuously.
>>>>> Here the buildProducerControlStreamOnce reads every time the job
>>>>> starts whether it's by submission or restart during a failure. But drawback
>>>>> of this approach is that checkpoints get failed here for finished operators
>>>>> in version 1.12.x
>>>>>
>>>>> [image: Screen Shot 2022-01-13 at 4.59.56 PM.png]
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Prasanna.
>>>>>
>>>>

Re: BroadCast State not loaded back into the job once its restarted due to a JM failure

Posted by Prasanna kumar <pr...@gmail.com>.
David,

For a short term solution

I tried the following .

1) Updated the s3 folder before the env.execute(job) command in the same
flink job , Still the file reader did not pick up even though the changes
are made to the folder .
2) Update he s3 folder  from external process (not flink job) , then the
reader is picking up the files.  But how does the external process when the
job would fail and restart ?

Is there a way where we can force the source operator to read from the S3
every interval.  ( now its not reading after a restart).

Would building a new source function similar to this would force the
reading everytime we restart or would it behave the same way ?

ContinuousFileMonitoringFunction<OUT> monitoringFunction = new
ContinuousFileMonitoringFunction(inputFormat, monitoringMode,
this.getParallelism(), interval);
ContinuousFileReaderOperatorFactory<OUT, TimestampedFileInputSplit>
factory = new ContinuousFileReaderOperatorFactory(inputFormat);
Boundedness boundedness = monitoringMode ==
FileProcessingMode.PROCESS_ONCE ? Boundedness.BOUNDED :
Boundedness.CONTINUOUS_UNBOUNDED;
SingleOutputStreamOperator<OUT> source =
this.addSource(monitoringFunction, sourceName, (TypeInformation)null,
boundedness).transform("Split Reader: " + sourceName, typeInfo,
factory);

return new DataStreamSource(source);

Present Implementation:











*private DataStream<String> buildProducerControlStreamContinuously(
 StreamExecutionEnvironment streamExecutionEnvironment, ParameterTool
configParams) {    int s3PathMonitoringInterval =
configParams.getInt(AppConstant.S3_PRODUCER_CONFIG_PATH_MONITORING_INTERVAL);
  String s3ProducerConfigPath =
configParams.get(AppConstant.S3_PRODUCER_CONFIG_PATH);    return
streamExecutionEnvironment.readFile(new TextInputFormat(new
Path(s3ProducerConfigPath)),        s3ProducerConfigPath,
  FileProcessingMode.PROCESS_CONTINUOUSLY,
s3PathMonitoringInterval);  }*

For a longer term solution , i am looking at state processor API to read
the checkpointed state file.

Thanks,
Prasanna.

On Wed, Feb 2, 2022 at 7:16 PM Dawid Wysakowicz <dw...@apache.org>
wrote:

> a) That does not necessarily mean the broadcast state is not restored. I
> don't what is your logic for generating events or handling rules. Have you
> verified that there is nothing in the BroadcastState you use there?
>
> b) That's expected as it has read from the files before the restore.
>
> I have read the entire thread.
>
> Best,
>
> Dawid
> On 02/02/2022 14:34, Prasanna kumar wrote:
>
> David,
>
> *May I ask you how do you tell there is no data restored into the
> broadcast state?*
>
> When the new leader JM comes up after existing JM dies and the job
> restarts ,
> a) the events are not getting generated, the event rules are part of the
> broadcast state. Prior to the restart the events are generated.
> b) Also the file source reader does not read from the files where the
> rules are stored.  (more details in my first mail in this chain)
> [image: image.png]
>
> Meanwhile will go through the StateProcessor API.
>
> Thanks,
> Prasanna.
>
> On Wed, Feb 2, 2022 at 6:37 PM Dawid Wysakowicz <dw...@apache.org>
> wrote:
>
>> Hey Prasanna,
>>
>> Sorry to hear you have not found a solution yet.
>>
>> May I ask you how do you tell there is no data restored into the
>> broadcast state? Have you tried simplifying your pipeline to limit the
>> potential causes of the problem? Could you maybe share the code of your
>> (Keyed)BroadcastProcessFunction?
>>
>> As for looking into the checkpoint itself. You could try to check if
>> State Processor API[1] could be of help for you.
>>
>> Best,
>>
>> Dawid
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
>> On 02/02/2022 13:52, Prasanna kumar wrote:
>>
>> Could some one guide me here ? It had been long since i raised this issue
>> .
>> We are stuck here and none of the solutions are working .
>>
>> Prasanna
>>
>> On Fri, Jan 21, 2022 at 5:48 PM Prasanna kumar <
>> prasannakumarramani@gmail.com> wrote:
>>
>>> Is there a mechanism where we can read checkpoint file to confirm
>>> whether the Broadcast state objects are properly serialized ?
>>> I have not seen any documentation on the same.
>>>
>>> Thanks,
>>> Prasanna.
>>>
>>> On Fri, Jan 14, 2022 at 3:55 PM Prasanna kumar <
>>> prasannakumarramani@gmail.com> wrote:
>>>
>>>> David,
>>>> I even cancelled the job manually with externalized checkpoint.
>>>>
>>>> Then restarted the job with same checkpoint. (I did not have any rules
>>>> file in the s3 location to be read by the file reader operator).  The job
>>>> started and ran. Still it did not load the backed up state data from
>>>> checkpoint file.
>>>>
>>>> Thanks,
>>>> Prasanna.
>>>>
>>>> On Thu, 13 Jan 2022, 19:05 Prasanna kumar, <
>>>> prasannakumarramani@gmail.com> wrote:
>>>>
>>>>> David,
>>>>>
>>>>> *Instead, whatever has been saved into broadcast state in the
>>>>> processBroadcastElement method of your coProcessBroadcast function should
>>>>> be restored.*=>
>>>>>
>>>>> This is what i am expecting here and thats not happening.
>>>>>
>>>>> *To save data into broadcast state you do something like this:
>>>>>  ctx.getBroadcastState(descriptor).put(key, value);*
>>>>>
>>>>> This is being done in the following manner
>>>>>
>>>>> BroadcastState<String, ECState> broadcastState =
>>>>>     ctx.getBroadcastState(eCStateDescriptor);EventConfigState eCState = ECStateFactory.getEventConfigState(broadCastKey);if (!broadcastState.contains(broadCastKey)) {
>>>>>     broadcastState.put(broadCastKey, eCState);}
>>>>>
>>>>>
>>>>> Thanks
>>>>> Prasanna.
>>>>>
>>>>> On Thu, Jan 13, 2022 at 6:43 PM David Anderson <da...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> You shouldn't expect data that was broadcast before the last
>>>>>> checkpoint to be rebroadcast after a restart. Instead, whatever has been
>>>>>> saved into broadcast state in the processBroadcastElement method of your
>>>>>> coProcessBroadcast function should be restored.
>>>>>>
>>>>>> To save data into broadcast state you do something like this:
>>>>>>
>>>>>>     ctx.getBroadcastState(descriptor).put(key, value);
>>>>>>
>>>>>> David
>>>>>>
>>>>>> On Thu, Jan 13, 2022 at 12:40 PM Prasanna kumar <
>>>>>> prasannakumarramani@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> FLINK version 1.12.7
>>>>>>> We have the following job.
>>>>>>> Source Read from Kafka apply broadcast rules and then write output
>>>>>>> to kafka.
>>>>>>> Rules are being read from S3 and written to broadcast state.
>>>>>>>
>>>>>>> When the Job gets restarted due to a JM failure.
>>>>>>> 1) The Following code (to read from the S3 bucket and populate to
>>>>>>> the broadcast state) does not read the contents as it reads when we start
>>>>>>> the job during submission or when the files in the bucket changes.
>>>>>>> *I did not see any configurations to make it read everytime it
>>>>>>> starts ? Is there a way to make this happen ?*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *private DataStream<String> buildProducerControlStreamContinuously(
>>>>>>>      StreamExecutionEnvironment streamExecutionEnvironment, ParameterTool
>>>>>>> configParams) {     int s3PathMonitoringInterval =
>>>>>>> configParams.getInt(AppConstant.S3_PRODUCER_CONFIG_PATH_MONITORING_INTERVAL);
>>>>>>>     String s3ProducerConfigPath =
>>>>>>> configParams.get(AppConstant.S3_PRODUCER_CONFIG_PATH);     return
>>>>>>> streamExecutionEnvironment.readFile(new TextInputFormat(new
>>>>>>> Path(s3ProducerConfigPath)),         s3ProducerConfigPath,
>>>>>>>   FileProcessingMode.PROCESS_CONTINUOUSLY,
>>>>>>> s3PathMonitoringInterval);   }*
>>>>>>> 2) Even if the earlier option did not work , at least the checkpoint
>>>>>>> should have the broadcast state in picture, so the Job Manager/JobMaster
>>>>>>> should be loading it back to the job but it is not happening as we see post
>>>>>>> restart , the rules are not applied on the incoming kafka records and they
>>>>>>> go unprocessed.  I also checked the metadata file the classes of broadcast
>>>>>>> state , serializer are listed. though it's more machine specific, I did not
>>>>>>> understand a lot .
>>>>>>> *Isn't the loading of the state an expected behavior when restarting
>>>>>>> due to a failure?*
>>>>>>>
>>>>>>> Topology of the Job
>>>>>>>
>>>>>>> [image: Screen Shot 2022-01-13 at 4.52.03 PM.png]
>>>>>>>
>>>>>>> *Solution Tried.  *
>>>>>>> Added one more control stream where we have
>>>>>>> buildProducerControlStreamOnce Operator apart from
>>>>>>> buildProducerControlStreamContinuously.
>>>>>>> Here the buildProducerControlStreamOnce reads every time the job
>>>>>>> starts whether it's by submission or restart during a failure. But drawback
>>>>>>> of this approach is that checkpoints get failed here for finished operators
>>>>>>> in version 1.12.x
>>>>>>>
>>>>>>> [image: Screen Shot 2022-01-13 at 4.59.56 PM.png]
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Prasanna.
>>>>>>>
>>>>>>

Re: BroadCast State not loaded back into the job once its restarted due to a JM failure

Posted by Dawid Wysakowicz <dw...@apache.org>.
a) That does not necessarily mean the broadcast state is not restored. I 
don't what is your logic for generating events or handling rules. Have 
you verified that there is nothing in the BroadcastState you use there?

b) That's expected as it has read from the files before the restore.

I have read the entire thread.

Best,

Dawid

On 02/02/2022 14:34, Prasanna kumar wrote:
> David,
>
> *May I ask you how do you tell there is no data restored into the 
> broadcast state?*
> *
> *
> When the new leader JM comes up after existing JM dies and the job 
> restarts ,
> a) the events are not getting generated, the event rules are part of 
> the broadcast state. Prior to the restart the events are generated.
> b) Also the file source reader does not read from the files where the 
> rules are stored.  (more details in my first mail in this chain)
> image.png
>
> Meanwhile will go through the StateProcessor API.
>
> Thanks,
> Prasanna.
>
> On Wed, Feb 2, 2022 at 6:37 PM Dawid Wysakowicz 
> <dw...@apache.org> wrote:
>
>     Hey Prasanna,
>
>     Sorry to hear you have not found a solution yet.
>
>     May I ask you how do you tell there is no data restored into the
>     broadcast state? Have you tried simplifying your pipeline to limit
>     the potential causes of the problem? Could you maybe share the
>     code of your (Keyed)BroadcastProcessFunction?
>
>     As for looking into the checkpoint itself. You could try to check
>     if State Processor API[1] could be of help for you.
>
>     Best,
>
>     Dawid
>
>     [1]
>     https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
>
>     On 02/02/2022 13:52, Prasanna kumar wrote:
>>     Could some one guide me here ? It had been long since i raised
>>     this issue .
>>     We are stuck here and none of the solutions are working .
>>
>>     Prasanna
>>
>>     On Fri, Jan 21, 2022 at 5:48 PM Prasanna kumar
>>     <pr...@gmail.com> wrote:
>>
>>         Is there a mechanism where we can read checkpoint file to
>>         confirm whether the Broadcast state objects are properly
>>         serialized ?
>>         I have not seen any documentation on the same.
>>
>>         Thanks,
>>         Prasanna.
>>
>>         On Fri, Jan 14, 2022 at 3:55 PM Prasanna kumar
>>         <pr...@gmail.com> wrote:
>>
>>             David,
>>             I even cancelled the job manually with externalized
>>             checkpoint.
>>
>>             Then restarted the job with same checkpoint. (I did not
>>             have any rules file in the s3 location to be read by the
>>             file reader operator).  The job started and ran. Still it
>>             did not load the backed up state data from checkpoint file.
>>
>>             Thanks,
>>             Prasanna.
>>
>>             On Thu, 13 Jan 2022, 19:05 Prasanna kumar,
>>             <pr...@gmail.com> wrote:
>>
>>                 David,
>>
>>                 *Instead, whatever has been saved into broadcast
>>                 state in the processBroadcastElement method of your
>>                 coProcessBroadcast function should be restored.*=>
>>
>>                 This is what i am expecting here and thats not
>>                 happening.
>>
>>                 *To save data into broadcast state you do something
>>                 like this:
>>                  ctx.getBroadcastState(descriptor).put(key, value);*
>>
>>                 This is being done in the following manner
>>
>>                 BroadcastState<String, ECState> broadcastState =
>>                      ctx.getBroadcastState(eCStateDescriptor); EventConfigState eCState = ECStateFactory.getEventConfigState(broadCastKey); if (!broadcastState.contains(broadCastKey)) {
>>                      broadcastState.put(broadCastKey, eCState); }
>>
>>
>>                 Thanks
>>                 Prasanna.
>>
>>                 On Thu, Jan 13, 2022 at 6:43 PM David Anderson
>>                 <da...@apache.org> wrote:
>>
>>                     You shouldn't expect data that was broadcast
>>                     before the last checkpoint to be rebroadcast
>>                     after a restart. Instead, whatever has been saved
>>                     into broadcast state in the
>>                     processBroadcastElement method of your
>>                     coProcessBroadcast function should be restored.
>>
>>                     To save data into broadcast state you do
>>                     something like this:
>>
>>                     ctx.getBroadcastState(descriptor).put(key, value);
>>
>>                     David
>>
>>                     On Thu, Jan 13, 2022 at 12:40 PM Prasanna kumar
>>                     <pr...@gmail.com> wrote:
>>
>>                         Hi,
>>
>>                         FLINK version 1.12.7
>>                         We have the following job.
>>                         Source Read from Kafka apply broadcast rules
>>                         and then write output to kafka.
>>                         Rules are being read from S3 and written to
>>                         broadcast state.
>>
>>                         When the Job gets restarted due to a JM failure.
>>                         1) The Following code (to read from the S3
>>                         bucket and populate to the broadcast state)
>>                         does not read the contents as it reads when
>>                         we start the job during submission or when
>>                         the files in the bucket changes.
>>                         /*I did not see any configurations to make it
>>                         read everytime it starts ? Is there a way to
>>                         make this happen ?*/
>>
>>                         *private DataStream<String>
>>                         buildProducerControlStreamContinuously(
>>                          StreamExecutionEnvironment
>>                         streamExecutionEnvironment, ParameterTool
>>                         configParams) {
>>
>>                             int s3PathMonitoringInterval =
>>                         configParams.getInt(AppConstant.S3_PRODUCER_CONFIG_PATH_MONITORING_INTERVAL);
>>                             String s3ProducerConfigPath =
>>                         configParams.get(AppConstant.S3_PRODUCER_CONFIG_PATH);
>>
>>                             return
>>                         streamExecutionEnvironment.readFile(new
>>                         TextInputFormat(new Path(s3ProducerConfigPath)),
>>                         s3ProducerConfigPath,
>>                           FileProcessingMode.PROCESS_CONTINUOUSLY,
>>                         s3PathMonitoringInterval);
>>                           }*
>>                         2) Even if the earlier option did not work ,
>>                         at least the checkpoint should have the
>>                         broadcast state in picture, so the Job
>>                         Manager/JobMaster should be loading it back
>>                         to the job but it is not happening as we see
>>                         post restart , the rules are not applied on
>>                         the incoming kafka records and they go
>>                         unprocessed.  I also checked the metadata
>>                         file the classes of broadcast state ,
>>                         serializer are listed. though it's more
>>                         machine specific, I did not understand a lot .
>>                         */Isn't the loading of the state an expected
>>                         behavior when restarting due to a failure?/*
>>
>>                         Topology of the Job
>>
>>                         Screen Shot 2022-01-13 at 4.52.03 PM.png
>>
>>                         *Solution Tried. *
>>                         Added one more control stream where we have
>>                         buildProducerControlStreamOnce Operator apart
>>                         from buildProducerControlStreamContinuously.
>>                         Here the buildProducerControlStreamOnce reads
>>                         every time the job starts whether it's by
>>                         submission or restart during a failure. But
>>                         drawback of this approach is that checkpoints
>>                         get failed here for finished operators in
>>                         version 1.12.x
>>
>>                         Screen Shot 2022-01-13 at 4.59.56 PM.png
>>
>>
>>                         Thanks,
>>                         Prasanna.
>>

Re: BroadCast State not loaded back into the job once its restarted due to a JM failure

Posted by Prasanna kumar <pr...@gmail.com>.
David,

*May I ask you how do you tell there is no data restored into the broadcast
state?*

When the new leader JM comes up after existing JM dies and the job restarts
,
a) the events are not getting generated, the event rules are part of the
broadcast state. Prior to the restart the events are generated.
b) Also the file source reader does not read from the files where the rules
are stored.  (more details in my first mail in this chain)
[image: image.png]

Meanwhile will go through the StateProcessor API.

Thanks,
Prasanna.

On Wed, Feb 2, 2022 at 6:37 PM Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hey Prasanna,
>
> Sorry to hear you have not found a solution yet.
>
> May I ask you how do you tell there is no data restored into the broadcast
> state? Have you tried simplifying your pipeline to limit the potential
> causes of the problem? Could you maybe share the code of your
> (Keyed)BroadcastProcessFunction?
>
> As for looking into the checkpoint itself. You could try to check if State
> Processor API[1] could be of help for you.
>
> Best,
>
> Dawid
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
> On 02/02/2022 13:52, Prasanna kumar wrote:
>
> Could some one guide me here ? It had been long since i raised this issue .
> We are stuck here and none of the solutions are working .
>
> Prasanna
>
> On Fri, Jan 21, 2022 at 5:48 PM Prasanna kumar <
> prasannakumarramani@gmail.com> wrote:
>
>> Is there a mechanism where we can read checkpoint file to confirm whether
>> the Broadcast state objects are properly serialized ?
>> I have not seen any documentation on the same.
>>
>> Thanks,
>> Prasanna.
>>
>> On Fri, Jan 14, 2022 at 3:55 PM Prasanna kumar <
>> prasannakumarramani@gmail.com> wrote:
>>
>>> David,
>>> I even cancelled the job manually with externalized checkpoint.
>>>
>>> Then restarted the job with same checkpoint. (I did not have any rules
>>> file in the s3 location to be read by the file reader operator).  The job
>>> started and ran. Still it did not load the backed up state data from
>>> checkpoint file.
>>>
>>> Thanks,
>>> Prasanna.
>>>
>>> On Thu, 13 Jan 2022, 19:05 Prasanna kumar, <
>>> prasannakumarramani@gmail.com> wrote:
>>>
>>>> David,
>>>>
>>>> *Instead, whatever has been saved into broadcast state in the
>>>> processBroadcastElement method of your coProcessBroadcast function should
>>>> be restored.*=>
>>>>
>>>> This is what i am expecting here and thats not happening.
>>>>
>>>> *To save data into broadcast state you do something like this:
>>>>  ctx.getBroadcastState(descriptor).put(key, value);*
>>>>
>>>> This is being done in the following manner
>>>>
>>>> BroadcastState<String, ECState> broadcastState =
>>>>     ctx.getBroadcastState(eCStateDescriptor);EventConfigState eCState = ECStateFactory.getEventConfigState(broadCastKey);if (!broadcastState.contains(broadCastKey)) {
>>>>     broadcastState.put(broadCastKey, eCState);}
>>>>
>>>>
>>>> Thanks
>>>> Prasanna.
>>>>
>>>> On Thu, Jan 13, 2022 at 6:43 PM David Anderson <da...@apache.org>
>>>> wrote:
>>>>
>>>>> You shouldn't expect data that was broadcast before the last
>>>>> checkpoint to be rebroadcast after a restart. Instead, whatever has been
>>>>> saved into broadcast state in the processBroadcastElement method of your
>>>>> coProcessBroadcast function should be restored.
>>>>>
>>>>> To save data into broadcast state you do something like this:
>>>>>
>>>>>     ctx.getBroadcastState(descriptor).put(key, value);
>>>>>
>>>>> David
>>>>>
>>>>> On Thu, Jan 13, 2022 at 12:40 PM Prasanna kumar <
>>>>> prasannakumarramani@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> FLINK version 1.12.7
>>>>>> We have the following job.
>>>>>> Source Read from Kafka apply broadcast rules and then write output to
>>>>>> kafka.
>>>>>> Rules are being read from S3 and written to broadcast state.
>>>>>>
>>>>>> When the Job gets restarted due to a JM failure.
>>>>>> 1) The Following code (to read from the S3 bucket and populate to the
>>>>>> broadcast state) does not read the contents as it reads when we start the
>>>>>> job during submission or when the files in the bucket changes.
>>>>>> *I did not see any configurations to make it read everytime it starts
>>>>>> ? Is there a way to make this happen ?*
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *private DataStream<String> buildProducerControlStreamContinuously(
>>>>>>    StreamExecutionEnvironment streamExecutionEnvironment, ParameterTool
>>>>>> configParams) {     int s3PathMonitoringInterval =
>>>>>> configParams.getInt(AppConstant.S3_PRODUCER_CONFIG_PATH_MONITORING_INTERVAL);
>>>>>>     String s3ProducerConfigPath =
>>>>>> configParams.get(AppConstant.S3_PRODUCER_CONFIG_PATH);     return
>>>>>> streamExecutionEnvironment.readFile(new TextInputFormat(new
>>>>>> Path(s3ProducerConfigPath)),         s3ProducerConfigPath,
>>>>>>   FileProcessingMode.PROCESS_CONTINUOUSLY,
>>>>>> s3PathMonitoringInterval);   }*
>>>>>> 2) Even if the earlier option did not work , at least the checkpoint
>>>>>> should have the broadcast state in picture, so the Job Manager/JobMaster
>>>>>> should be loading it back to the job but it is not happening as we see post
>>>>>> restart , the rules are not applied on the incoming kafka records and they
>>>>>> go unprocessed.  I also checked the metadata file the classes of broadcast
>>>>>> state , serializer are listed. though it's more machine specific, I did not
>>>>>> understand a lot .
>>>>>> *Isn't the loading of the state an expected behavior when restarting
>>>>>> due to a failure?*
>>>>>>
>>>>>> Topology of the Job
>>>>>>
>>>>>> [image: Screen Shot 2022-01-13 at 4.52.03 PM.png]
>>>>>>
>>>>>> *Solution Tried.  *
>>>>>> Added one more control stream where we have
>>>>>> buildProducerControlStreamOnce Operator apart from
>>>>>> buildProducerControlStreamContinuously.
>>>>>> Here the buildProducerControlStreamOnce reads every time the job
>>>>>> starts whether it's by submission or restart during a failure. But drawback
>>>>>> of this approach is that checkpoints get failed here for finished operators
>>>>>> in version 1.12.x
>>>>>>
>>>>>> [image: Screen Shot 2022-01-13 at 4.59.56 PM.png]
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Prasanna.
>>>>>>
>>>>>

Re: BroadCast State not loaded back into the job once its restarted due to a JM failure

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hey Prasanna,

Sorry to hear you have not found a solution yet.

May I ask you how do you tell there is no data restored into the 
broadcast state? Have you tried simplifying your pipeline to limit the 
potential causes of the problem? Could you maybe share the code of your 
(Keyed)BroadcastProcessFunction?

As for looking into the checkpoint itself. You could try to check if 
State Processor API[1] could be of help for you.

Best,

Dawid

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/

On 02/02/2022 13:52, Prasanna kumar wrote:
> Could some one guide me here ? It had been long since i raised this 
> issue .
> We are stuck here and none of the solutions are working .
>
> Prasanna
>
> On Fri, Jan 21, 2022 at 5:48 PM Prasanna kumar 
> <pr...@gmail.com> wrote:
>
>     Is there a mechanism where we can read checkpoint file to confirm
>     whether the Broadcast state objects are properly serialized ?
>     I have not seen any documentation on the same.
>
>     Thanks,
>     Prasanna.
>
>     On Fri, Jan 14, 2022 at 3:55 PM Prasanna kumar
>     <pr...@gmail.com> wrote:
>
>         David,
>         I even cancelled the job manually with externalized checkpoint.
>
>         Then restarted the job with same checkpoint. (I did not have
>         any rules file in the s3 location to be read by the file
>         reader operator).  The job started and ran. Still it did not
>         load the backed up state data from checkpoint file.
>
>         Thanks,
>         Prasanna.
>
>         On Thu, 13 Jan 2022, 19:05 Prasanna kumar,
>         <pr...@gmail.com> wrote:
>
>             David,
>
>             *Instead, whatever has been saved into broadcast state in
>             the processBroadcastElement method of your
>             coProcessBroadcast function should be restored.*=>
>
>             This is what i am expecting here and thats not happening.
>
>             *To save data into broadcast state you do something like
>             this:  ctx.getBroadcastState(descriptor).put(key, value);*
>
>             This is being done in the following manner
>
>             BroadcastState<String, ECState> broadcastState =
>                  ctx.getBroadcastState(eCStateDescriptor); EventConfigState eCState = ECStateFactory.getEventConfigState(broadCastKey); if (!broadcastState.contains(broadCastKey)) {
>                  broadcastState.put(broadCastKey, eCState); }
>
>
>             Thanks
>             Prasanna.
>
>             On Thu, Jan 13, 2022 at 6:43 PM David Anderson
>             <da...@apache.org> wrote:
>
>                 You shouldn't expect data that was broadcast before
>                 the last checkpoint to be rebroadcast after a restart.
>                 Instead, whatever has been saved into broadcast state
>                 in the processBroadcastElement method of your
>                 coProcessBroadcast function should be restored.
>
>                 To save data into broadcast state you do something
>                 like this:
>
>                 ctx.getBroadcastState(descriptor).put(key, value);
>
>                 David
>
>                 On Thu, Jan 13, 2022 at 12:40 PM Prasanna kumar
>                 <pr...@gmail.com> wrote:
>
>                     Hi,
>
>                     FLINK version 1.12.7
>                     We have the following job.
>                     Source Read from Kafka apply broadcast rules and
>                     then write output to kafka.
>                     Rules are being read from S3 and written to
>                     broadcast state.
>
>                     When the Job gets restarted due to a JM failure.
>                     1) The Following code (to read from the S3 bucket
>                     and populate to the broadcast state) does not read
>                     the contents as it reads when we start the job
>                     during submission or when the files in the bucket
>                     changes.
>                     /*I did not see any configurations to make it read
>                     everytime it starts ? Is there a way to make this
>                     happen ?*/
>
>                     *private DataStream<String>
>                     buildProducerControlStreamContinuously(
>                          StreamExecutionEnvironment
>                     streamExecutionEnvironment, ParameterTool
>                     configParams) {
>
>                         int s3PathMonitoringInterval =
>                     configParams.getInt(AppConstant.S3_PRODUCER_CONFIG_PATH_MONITORING_INTERVAL);
>                         String s3ProducerConfigPath =
>                     configParams.get(AppConstant.S3_PRODUCER_CONFIG_PATH);
>
>                         return streamExecutionEnvironment.readFile(new
>                     TextInputFormat(new Path(s3ProducerConfigPath)),
>                             s3ProducerConfigPath,
>                       FileProcessingMode.PROCESS_CONTINUOUSLY,
>                             s3PathMonitoringInterval);
>                       }*
>                     2) Even if the earlier option did not work , at
>                     least the checkpoint should have the broadcast
>                     state in picture, so the Job Manager/JobMaster
>                     should be loading it back to the job but it is not
>                     happening as we see post restart , the rules are
>                     not applied on the incoming kafka records and they
>                     go unprocessed.  I also checked the metadata file
>                     the classes of broadcast state , serializer are
>                     listed. though it's more machine specific, I did
>                     not understand a lot .
>                     */Isn't the loading of the state an expected
>                     behavior when restarting due to a failure?/*
>
>                     Topology of the Job
>
>                     Screen Shot 2022-01-13 at 4.52.03 PM.png
>
>                     *Solution Tried. *
>                     Added one more control stream where we have
>                     buildProducerControlStreamOnce Operator apart from
>                     buildProducerControlStreamContinuously.
>                     Here the buildProducerControlStreamOnce reads
>                     every time the job starts whether it's by
>                     submission or restart during a failure. But
>                     drawback of this approach is that checkpoints get
>                     failed here for finished operators in version 1.12.x
>
>                     Screen Shot 2022-01-13 at 4.59.56 PM.png
>
>
>                     Thanks,
>                     Prasanna.
>