You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Talat Uyarer via user <us...@flink.apache.org> on 2023/02/01 07:34:07 UTC

Reducing Checkpoint Count for Chain Operator

Hi,

We have a job that is reading from kafka and writing some endpoints. The
job does not have any shuffling steps.  I implement it with multiple
steps.  Flink chained those operators in one operator in submission time.
However I see all operators are doing checkpointing.

Is there any way to create one checkpoint object per chain operator rather
than all operators ?

Thanks

Re: Reducing Checkpoint Count for Chain Operator

Posted by Talat Uyarer via user <us...@flink.apache.org>.
Hi Schwalbe,


>    - There is no way to have only one file unless you lower the
>    parallelism to 1 (= only one subtask)
>
>
Even with single parallelism there are multiple checkpoint files for
chained operators.


>    - So which files do you see: 1 “_metadata” + multiple data files (or
>    just one)?
>
>
Yes per checkpoint we have a folder with a checkpoint number. That folder
has one metadata file and one file per operator with vertex uuid.


>    - The idea of having multiple files is to allow multiple threads to be
>    able to stare checkpoints at the same time, and when restarting from a
>    checkpoint to consume from more files potentially distributed to multiple
>    physical hard driver (more I/O capacity)
>
>
Yes I am well aware of why we have multiple files for operators. But having
a file per operator which is running the same thread in the operator chain
is redundant and increases checkpoint size. I believe the operator chain
driver could handle checkpointing all at once. It would reduce the total
size of the checkpoint. Because all chain operators use the same memory. If
objectreuse is enabled. then they use exact same objects.

Still (out of curiosity) why would you want to have everything in a single
> file?


I dont want to have single files for checkpointing. I want one file per
operator chain group. rather than having multiple files per operator in the
chain. When you have huge parallelism checkpoint size can hit huge numbers
such as GBs. And because of the size of the checkpoint we can not do
frequent checkpointing as much as we want. Chaining Operators are a really
good optimization in terms of memory usage. However it is still lacking in
terms of checkpointing. Today we are using Dataflow. Dataflow has similar
behavior with checkpoint support they call pipeline fusion. [1]

[1]
https://cloud.google.com/dataflow/docs/pipeline-lifecycle#fusion_optimization

Thanks

On Thu, Feb 2, 2023 at 9:25 AM Schwalbe Matthias <
Matthias.Schwalbe@viseca.ch> wrote:

> Hi Talat Uyarer,
>
>
>
>    - There is no way to have only one file unless you lower the
>    parallelism to 1 (= only one subtask)
>    - So which files do you see: 1 “_metadata” + multiple data files (or
>    just one)?
>    - The idea of having multiple files is to allow multiple threads to be
>    able to stare checkpoints at the same time, and when restarting from a
>    checkpoint to consume from more files potentially distributed to multiple
>    physical hard driver (more I/O capacity)
>    - So in general it is good to have multiple files
>
>
>
> Still (out of curiosity) why would you want to have everything in a single
> file?
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
>
>
> *From:* Talat Uyarer <tu...@paloaltonetworks.com>
> *Sent:* Thursday, February 2, 2023 5:57 PM
> *To:* Schwalbe Matthias <Ma...@viseca.ch>
> *Cc:* Kishore Pola <ki...@hotmail.com>; weijie guo <
> guoweijiereswqa@gmail.com>; user@flink.apache.org
> *Subject:* Re: Reducing Checkpoint Count for Chain Operator
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Schwalbe, weijie,
>
>
>
> Thanks for your reply.
>
>
>
>
>    - Each state primitive/per subtask stores state into a separate file
>
>
>
> In this picture You can see Operator Chain
> https://nightlies.apache.org/flink/flink-docs-master/fig/tasks_chains.svg
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.org_flink_flink-2Ddocs-2Dmaster_fig_tasks-5Fchains.svg&d=DwMGaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=2rvMynf_RRdLppOftxxNXj4d2xIKuDBgRCDui4KEBUMnmC2Xs9FHeVT2u0F5phSC&s=P6gNiFIDFa7RcSbPI7tx8ccw5WLhsi--Y0LU5v2h1Rk&e=>
>
>
>
> Source and Map are in the same chain. Today Flink creates two files for
> that operator chain. When we have OperatorChain, All subtasks are running
> in the same machine, same thread for memory optimization.  However Flink
> creates separate files per subtasks. Our question is whether there is a way
> to have one file not multiple files.
>
>
>
> Thanks
>
>
>
>
>
>
>
> On Wed, Feb 1, 2023 at 11:50 PM Schwalbe Matthias <
> Matthias.Schwalbe@viseca.ch> wrote:
>
> Hi Kishore,
>
>
>
>
>
> Having followed this thread for a while it is still quite a bit of
> confusion of concepts and in order to help resolve your original we would
> need to know,
>
>    - *what makes your observation a problem to be solved?*
>    - You write, you have no shuffling, does that mean you don’t use any
>    keyBy(), or rebalance()?
>    - How do you determine that there are 7 checkpoint, one for each
>    operator?
>    - In general please relate a bit more details about how you configure
>    state primitives: kinds/also operator state?/on all operators/etc.
>
>
>
> In general (as Weijie told) checkpointing works like that (simplified):
>
>    - Jobmanager creates checkpoint mark/barrier in a configured interval
>    - For synchronous checkpointing this flows along with the events
>    through the chain of tasks
>    - For asynchronous checkpointing, the checkpointing marker is directly
>    sent to the subtasks
>    - A single checkpoint looks like that:
>
>
>    - Each state primitive/per subtask stores state into a separate file
>       - At the end jobmager writes a “_metadata” file for the checkpoint
>       metadata and for state that is too small to end up in a separate file
>       - i.e. each checkpoint generates only one checkpoint (multiple
>       files) not 7
>
>
>
> Hope we shed a little light on this
>
>
>
> Best regards
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Kishore Pola <ki...@hotmail.com>
> *Sent:* Thursday, February 2, 2023 4:12 AM
> *To:* weijie guo <gu...@gmail.com>; Talat Uyarer <
> tuyarer@paloaltonetworks.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Reducing Checkpoint Count for Chain Operator
>
>
>
> Hi Weijie,
>
>
>
> In our case we do have 7 operators. All the 7 operators are getting
> executed as one chain within a single StreamTask. As checkpoint barrier is
> passing through all the operators, there are 7 checkpoints being stored. So
> our checkpoint size is up by 7 times. We are investigating to see if we can
> checkpoint the start operator (kafka source) or end operator (BQ sink), we
> are good and check point size comes down. Hence the question, when the
> operators are executed in the same StreamTask as one chain, is it possible
> to checkpoint at operator chain or single operator level?
>
>
>
> Thanks,
>
> Kishore
>
>
> ------------------------------
>
> *From:* weijie guo <gu...@gmail.com>
> *Sent:* Wednesday, February 1, 2023 6:59 PM
> *To:* Talat Uyarer <tu...@paloaltonetworks.com>
> *Cc:* user@flink.apache.org <us...@flink.apache.org>
> *Subject:* Re: Reducing Checkpoint Count for Chain Operator
>
>
>
> Hi Talat,
>
>
>
> In Flink, a checkpoint barrier will be injected from source, and then pass
> through all operators in turn. Each stateful operator will do checkpoint in
> this process, the state is managed at operator granularity, not operator
> chain. So what is the significance of checkpoint based on the granularity
> of operator chain?
>
>
>
> Best regards,
>
> Weijie
>
>
>
>
>
> Talat Uyarer <tu...@paloaltonetworks.com> 于2023年2月2日周四 02:20写道:
>
> Hi Weijie,
>
>
>
> Thanks for replying back.
>
>
>
> Our job is  a streaming job. The OperatorChain contains all operators that
> are executed as one chain within a single StreamTask. But each
> operator creates their own checkpoint at checkpointing time . Rather than
> creating a checkpoint per operator in checkpointing time. Can I have one
> checkpoint per OperatorChain? This is my question.
>
>
>
> Thanks
>
>
>
> On Wed, Feb 1, 2023 at 1:02 AM weijie guo <gu...@gmail.com>
> wrote:
>
> Hi Talat,
>
>
>
> Can you elaborate on what it means to create one checkpoint object per
> chain operator more than all operators? If you mean to do checkpoint
> independently for each task, this is not supported.
>
>
>
> Best regards,
>
> Weijie
>
>
>
>
>
> Talat Uyarer via user <us...@flink.apache.org> 于2023年2月1日周三 15:34写道:
>
> Hi,
>
>
>
> We have a job that is reading from kafka and writing some endpoints. The
> job does not have any shuffling steps.  I implement it with multiple
> steps.  Flink chained those operators in one operator in submission time.
> However I see all operators are doing checkpointing.
>
>
>
> Is there any way to create one checkpoint object per chain operator rather
> than all operators ?
>
>
>
> Thanks
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

RE: Reducing Checkpoint Count for Chain Operator

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hi Talat Uyarer,


  *   There is no way to have only one file unless you lower the parallelism to 1 (= only one subtask)
  *   So which files do you see: 1 “_metadata” + multiple data files (or just one)?
  *   The idea of having multiple files is to allow multiple threads to be able to stare checkpoints at the same time, and when restarting from a checkpoint to consume from more files potentially distributed to multiple physical hard driver (more I/O capacity)
  *   So in general it is good to have multiple files

Still (out of curiosity) why would you want to have everything in a single file?

Sincere greetings

Thias


From: Talat Uyarer <tu...@paloaltonetworks.com>
Sent: Thursday, February 2, 2023 5:57 PM
To: Schwalbe Matthias <Ma...@viseca.ch>
Cc: Kishore Pola <ki...@hotmail.com>; weijie guo <gu...@gmail.com>; user@flink.apache.org
Subject: Re: Reducing Checkpoint Count for Chain Operator

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Schwalbe, weijie,

Thanks for your reply.


  *   Each state primitive/per subtask stores state into a separate file

In this picture You can see Operator Chain https://nightlies.apache.org/flink/flink-docs-master/fig/tasks_chains.svg

Source and Map are in the same chain. Today Flink creates two files for that operator chain. When we have OperatorChain, All subtasks are running in the same machine, same thread for memory optimization.  However Flink creates separate files per subtasks. Our question is whether there is a way to have one file not multiple files.

Thanks



On Wed, Feb 1, 2023 at 11:50 PM Schwalbe Matthias <Ma...@viseca.ch>> wrote:
Hi Kishore,


Having followed this thread for a while it is still quite a bit of confusion of concepts and in order to help resolve your original we would need to know,

  *   what makes your observation a problem to be solved?
  *   You write, you have no shuffling, does that mean you don’t use any keyBy(), or rebalance()?
  *   How do you determine that there are 7 checkpoint, one for each operator?
  *   In general please relate a bit more details about how you configure state primitives: kinds/also operator state?/on all operators/etc.

In general (as Weijie told) checkpointing works like that (simplified):

  *   Jobmanager creates checkpoint mark/barrier in a configured interval
  *   For synchronous checkpointing this flows along with the events through the chain of tasks
  *   For asynchronous checkpointing, the checkpointing marker is directly sent to the subtasks
  *   A single checkpoint looks like that:

     *   Each state primitive/per subtask stores state into a separate file
     *   At the end jobmager writes a “_metadata” file for the checkpoint metadata and for state that is too small to end up in a separate file
     *   i.e. each checkpoint generates only one checkpoint (multiple files) not 7

Hope we shed a little light on this

Best regards

Thias



From: Kishore Pola <ki...@hotmail.com>>
Sent: Thursday, February 2, 2023 4:12 AM
To: weijie guo <gu...@gmail.com>>; Talat Uyarer <tu...@paloaltonetworks.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Reducing Checkpoint Count for Chain Operator

Hi Weijie,

In our case we do have 7 operators. All the 7 operators are getting executed as one chain within a single StreamTask. As checkpoint barrier is passing through all the operators, there are 7 checkpoints being stored. So our checkpoint size is up by 7 times. We are investigating to see if we can checkpoint the start operator (kafka source) or end operator (BQ sink), we are good and check point size comes down. Hence the question, when the operators are executed in the same StreamTask as one chain, is it possible to checkpoint at operator chain or single operator level?

Thanks,
Kishore

________________________________
From: weijie guo <gu...@gmail.com>>
Sent: Wednesday, February 1, 2023 6:59 PM
To: Talat Uyarer <tu...@paloaltonetworks.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>
Subject: Re: Reducing Checkpoint Count for Chain Operator

Hi Talat,

In Flink, a checkpoint barrier will be injected from source, and then pass through all operators in turn. Each stateful operator will do checkpoint in this process, the state is managed at operator granularity, not operator chain. So what is the significance of checkpoint based on the granularity of operator chain?


Best regards,

Weijie


Talat Uyarer <tu...@paloaltonetworks.com>> 于2023年2月2日周四 02:20写道:
Hi Weijie,

Thanks for replying back.

Our job is  a streaming job. The OperatorChain contains all operators that are executed as one chain within a single StreamTask. But each operator creates their own checkpoint at checkpointing time . Rather than creating a checkpoint per operator in checkpointing time. Can I have one checkpoint per OperatorChain? This is my question.

Thanks

On Wed, Feb 1, 2023 at 1:02 AM weijie guo <gu...@gmail.com>> wrote:
Hi Talat,

Can you elaborate on what it means to create one checkpoint object per chain operator more than all operators? If you mean to do checkpoint independently for each task, this is not supported.



Best regards,

Weijie


Talat Uyarer via user <us...@flink.apache.org>> 于2023年2月1日周三 15:34写道:
Hi,

We have a job that is reading from kafka and writing some endpoints. The job does not have any shuffling steps.  I implement it with multiple steps.  Flink chained those operators in one operator in submission time. However I see all operators are doing checkpointing.

Is there any way to create one checkpoint object per chain operator rather than all operators ?

Thanks
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: Reducing Checkpoint Count for Chain Operator

Posted by Talat Uyarer via user <us...@flink.apache.org>.
Hi Schwalbe, weijie,

Thanks for your reply.


>    - Each state primitive/per subtask stores state into a separate file
>
>
In this picture You can see Operator Chain
https://nightlies.apache.org/flink/flink-docs-master/fig/tasks_chains.svg

Source and Map are in the same chain. Today Flink creates two files for
that operator chain. When we have OperatorChain, All subtasks are running
in the same machine, same thread for memory optimization.  However Flink
creates separate files per subtasks. Our question is whether there is a way
to have one file not multiple files.

Thanks



On Wed, Feb 1, 2023 at 11:50 PM Schwalbe Matthias <
Matthias.Schwalbe@viseca.ch> wrote:

> Hi Kishore,
>
>
>
>
>
> Having followed this thread for a while it is still quite a bit of
> confusion of concepts and in order to help resolve your original we would
> need to know,
>
>    - *what makes your observation a problem to be solved?*
>    - You write, you have no shuffling, does that mean you don’t use any
>    keyBy(), or rebalance()?
>    - How do you determine that there are 7 checkpoint, one for each
>    operator?
>    - In general please relate a bit more details about how you configure
>    state primitives: kinds/also operator state?/on all operators/etc.
>
>
>
> In general (as Weijie told) checkpointing works like that (simplified):
>
>    - Jobmanager creates checkpoint mark/barrier in a configured interval
>    - For synchronous checkpointing this flows along with the events
>    through the chain of tasks
>    - For asynchronous checkpointing, the checkpointing marker is directly
>    sent to the subtasks
>    - A single checkpoint looks like that:
>       - Each state primitive/per subtask stores state into a separate file
>       - At the end jobmager writes a “_metadata” file for the checkpoint
>       metadata and for state that is too small to end up in a separate file
>       - i.e. each checkpoint generates only one checkpoint (multiple
>       files) not 7
>
>
>
> Hope we shed a little light on this
>
>
>
> Best regards
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Kishore Pola <ki...@hotmail.com>
> *Sent:* Thursday, February 2, 2023 4:12 AM
> *To:* weijie guo <gu...@gmail.com>; Talat Uyarer <
> tuyarer@paloaltonetworks.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Reducing Checkpoint Count for Chain Operator
>
>
>
> Hi Weijie,
>
>
>
> In our case we do have 7 operators. All the 7 operators are getting
> executed as one chain within a single StreamTask. As checkpoint barrier is
> passing through all the operators, there are 7 checkpoints being stored. So
> our checkpoint size is up by 7 times. We are investigating to see if we can
> checkpoint the start operator (kafka source) or end operator (BQ sink), we
> are good and check point size comes down. Hence the question, when the
> operators are executed in the same StreamTask as one chain, is it possible
> to checkpoint at operator chain or single operator level?
>
>
>
> Thanks,
>
> Kishore
>
>
> ------------------------------
>
> *From:* weijie guo <gu...@gmail.com>
> *Sent:* Wednesday, February 1, 2023 6:59 PM
> *To:* Talat Uyarer <tu...@paloaltonetworks.com>
> *Cc:* user@flink.apache.org <us...@flink.apache.org>
> *Subject:* Re: Reducing Checkpoint Count for Chain Operator
>
>
>
> Hi Talat,
>
>
>
> In Flink, a checkpoint barrier will be injected from source, and then pass
> through all operators in turn. Each stateful operator will do checkpoint in
> this process, the state is managed at operator granularity, not operator
> chain. So what is the significance of checkpoint based on the granularity
> of operator chain?
>
>
>
> Best regards,
>
> Weijie
>
>
>
>
>
> Talat Uyarer <tu...@paloaltonetworks.com> 于2023年2月2日周四 02:20写道:
>
> Hi Weijie,
>
>
>
> Thanks for replying back.
>
>
>
> Our job is  a streaming job. The OperatorChain contains all operators that
> are executed as one chain within a single StreamTask. But each
> operator creates their own checkpoint at checkpointing time . Rather than
> creating a checkpoint per operator in checkpointing time. Can I have one
> checkpoint per OperatorChain? This is my question.
>
>
>
> Thanks
>
>
>
> On Wed, Feb 1, 2023 at 1:02 AM weijie guo <gu...@gmail.com>
> wrote:
>
> Hi Talat,
>
>
>
> Can you elaborate on what it means to create one checkpoint object per
> chain operator more than all operators? If you mean to do checkpoint
> independently for each task, this is not supported.
>
>
>
> Best regards,
>
> Weijie
>
>
>
>
>
> Talat Uyarer via user <us...@flink.apache.org> 于2023年2月1日周三 15:34写道:
>
> Hi,
>
>
>
> We have a job that is reading from kafka and writing some endpoints. The
> job does not have any shuffling steps.  I implement it with multiple
> steps.  Flink chained those operators in one operator in submission time.
> However I see all operators are doing checkpointing.
>
>
>
> Is there any way to create one checkpoint object per chain operator rather
> than all operators ?
>
>
>
> Thanks
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

RE: Reducing Checkpoint Count for Chain Operator

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hi Kishore,


Having followed this thread for a while it is still quite a bit of confusion of concepts and in order to help resolve your original we would need to know,

  *   what makes your observation a problem to be solved?
  *   You write, you have no shuffling, does that mean you don’t use any keyBy(), or rebalance()?
  *   How do you determine that there are 7 checkpoint, one for each operator?
  *   In general please relate a bit more details about how you configure state primitives: kinds/also operator state?/on all operators/etc.

In general (as Weijie told) checkpointing works like that (simplified):

  *   Jobmanager creates checkpoint mark/barrier in a configured interval
  *   For synchronous checkpointing this flows along with the events through the chain of tasks
  *   For asynchronous checkpointing, the checkpointing marker is directly sent to the subtasks
  *   A single checkpoint looks like that:
     *   Each state primitive/per subtask stores state into a separate file
     *   At the end jobmager writes a “_metadata” file for the checkpoint metadata and for state that is too small to end up in a separate file
     *   i.e. each checkpoint generates only one checkpoint (multiple files) not 7

Hope we shed a little light on this

Best regards

Thias



From: Kishore Pola <ki...@hotmail.com>
Sent: Thursday, February 2, 2023 4:12 AM
To: weijie guo <gu...@gmail.com>; Talat Uyarer <tu...@paloaltonetworks.com>
Cc: user@flink.apache.org
Subject: Re: Reducing Checkpoint Count for Chain Operator

Hi Weijie,

In our case we do have 7 operators. All the 7 operators are getting executed as one chain within a single StreamTask. As checkpoint barrier is passing through all the operators, there are 7 checkpoints being stored. So our checkpoint size is up by 7 times. We are investigating to see if we can checkpoint the start operator (kafka source) or end operator (BQ sink), we are good and check point size comes down. Hence the question, when the operators are executed in the same StreamTask as one chain, is it possible to checkpoint at operator chain or single operator level?

Thanks,
Kishore

________________________________
From: weijie guo <gu...@gmail.com>>
Sent: Wednesday, February 1, 2023 6:59 PM
To: Talat Uyarer <tu...@paloaltonetworks.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>
Subject: Re: Reducing Checkpoint Count for Chain Operator

Hi Talat,

In Flink, a checkpoint barrier will be injected from source, and then pass through all operators in turn. Each stateful operator will do checkpoint in this process, the state is managed at operator granularity, not operator chain. So what is the significance of checkpoint based on the granularity of operator chain?


Best regards,

Weijie


Talat Uyarer <tu...@paloaltonetworks.com>> 于2023年2月2日周四 02:20写道:
Hi Weijie,

Thanks for replying back.

Our job is  a streaming job. The OperatorChain contains all operators that are executed as one chain within a single StreamTask. But each operator creates their own checkpoint at checkpointing time . Rather than creating a checkpoint per operator in checkpointing time. Can I have one checkpoint per OperatorChain? This is my question.

Thanks

On Wed, Feb 1, 2023 at 1:02 AM weijie guo <gu...@gmail.com>> wrote:
Hi Talat,

Can you elaborate on what it means to create one checkpoint object per chain operator more than all operators? If you mean to do checkpoint independently for each task, this is not supported.



Best regards,

Weijie


Talat Uyarer via user <us...@flink.apache.org>> 于2023年2月1日周三 15:34写道:
Hi,

We have a job that is reading from kafka and writing some endpoints. The job does not have any shuffling steps.  I implement it with multiple steps.  Flink chained those operators in one operator in submission time. However I see all operators are doing checkpointing.

Is there any way to create one checkpoint object per chain operator rather than all operators ?

Thanks
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: Reducing Checkpoint Count for Chain Operator

Posted by Kishore Pola <ki...@hotmail.com>.
Hi Weijie,

In our case we do have 7 operators. All the 7 operators are getting executed as one chain within a single StreamTask. As checkpoint barrier is passing through all the operators, there are 7 checkpoints being stored. So our checkpoint size is up by 7 times. We are investigating to see if we can checkpoint the start operator (kafka source) or end operator (BQ sink), we are good and check point size comes down. Hence the question, when the operators are executed in the same StreamTask as one chain, is it possible to checkpoint at operator chain or single operator level?

Thanks,
Kishore

________________________________
From: weijie guo <gu...@gmail.com>
Sent: Wednesday, February 1, 2023 6:59 PM
To: Talat Uyarer <tu...@paloaltonetworks.com>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: Re: Reducing Checkpoint Count for Chain Operator

Hi Talat,

In Flink, a checkpoint barrier will be injected from source, and then pass through all operators in turn. Each stateful operator will do checkpoint in this process, the state is managed at operator granularity, not operator chain. So what is the significance of checkpoint based on the granularity of operator chain?


Best regards,

Weijie


Talat Uyarer <tu...@paloaltonetworks.com>> 于2023年2月2日周四 02:20写道:
Hi Weijie,

Thanks for replying back.

Our job is  a streaming job. The OperatorChain contains all operators that are executed as one chain within a single StreamTask. But each operator creates their own checkpoint at checkpointing time . Rather than creating a checkpoint per operator in checkpointing time. Can I have one checkpoint per OperatorChain? This is my question.

Thanks

On Wed, Feb 1, 2023 at 1:02 AM weijie guo <gu...@gmail.com>> wrote:
Hi Talat,

Can you elaborate on what it means to create one checkpoint object per chain operator more than all operators? If you mean to do checkpoint independently for each task, this is not supported.


Best regards,

Weijie


Talat Uyarer via user <us...@flink.apache.org>> 于2023年2月1日周三 15:34写道:
Hi,

We have a job that is reading from kafka and writing some endpoints. The job does not have any shuffling steps.  I implement it with multiple steps.  Flink chained those operators in one operator in submission time. However I see all operators are doing checkpointing.

Is there any way to create one checkpoint object per chain operator rather than all operators ?

Thanks

Re: Reducing Checkpoint Count for Chain Operator

Posted by weijie guo <gu...@gmail.com>.
Hi Talat,

In Flink, a checkpoint barrier will be injected from source, and then pass
through all operators in turn. Each stateful operator will do checkpoint in
this process, the state is managed at operator granularity, not operator
chain. So what is the significance of checkpoint based on the granularity
of operator chain?

Best regards,

Weijie


Talat Uyarer <tu...@paloaltonetworks.com> 于2023年2月2日周四 02:20写道:

> Hi Weijie,
>
> Thanks for replying back.
>
> Our job is  a streaming job. The OperatorChain contains all operators that
> are executed as one chain within a single StreamTask. But each
> operator creates their own checkpoint at checkpointing time . Rather than
> creating a checkpoint per operator in checkpointing time. Can I have one
> checkpoint per OperatorChain? This is my question.
>
> Thanks
>
> On Wed, Feb 1, 2023 at 1:02 AM weijie guo <gu...@gmail.com>
> wrote:
>
>> Hi Talat,
>>
>> Can you elaborate on what it means to create one checkpoint object per
>> chain operator more than all operators? If you mean to do checkpoint
>> independently for each task, this is not supported.
>>
>>
>> Best regards,
>>
>> Weijie
>>
>>
>> Talat Uyarer via user <us...@flink.apache.org> 于2023年2月1日周三 15:34写道:
>>
>>> Hi,
>>>
>>> We have a job that is reading from kafka and writing some endpoints. The
>>> job does not have any shuffling steps.  I implement it with multiple
>>> steps.  Flink chained those operators in one operator in submission time.
>>> However I see all operators are doing checkpointing.
>>>
>>> Is there any way to create one checkpoint object per chain operator
>>> rather than all operators ?
>>>
>>> Thanks
>>>
>>

Re: Reducing Checkpoint Count for Chain Operator

Posted by Talat Uyarer via user <us...@flink.apache.org>.
Hi Weijie,

Thanks for replying back.

Our job is  a streaming job. The OperatorChain contains all operators that
are executed as one chain within a single StreamTask. But each
operator creates their own checkpoint at checkpointing time . Rather than
creating a checkpoint per operator in checkpointing time. Can I have one
checkpoint per OperatorChain? This is my question.

Thanks

On Wed, Feb 1, 2023 at 1:02 AM weijie guo <gu...@gmail.com> wrote:

> Hi Talat,
>
> Can you elaborate on what it means to create one checkpoint object per
> chain operator more than all operators? If you mean to do checkpoint
> independently for each task, this is not supported.
>
>
> Best regards,
>
> Weijie
>
>
> Talat Uyarer via user <us...@flink.apache.org> 于2023年2月1日周三 15:34写道:
>
>> Hi,
>>
>> We have a job that is reading from kafka and writing some endpoints. The
>> job does not have any shuffling steps.  I implement it with multiple
>> steps.  Flink chained those operators in one operator in submission time.
>> However I see all operators are doing checkpointing.
>>
>> Is there any way to create one checkpoint object per chain operator
>> rather than all operators ?
>>
>> Thanks
>>
>

Re: Reducing Checkpoint Count for Chain Operator

Posted by weijie guo <gu...@gmail.com>.
Hi Talat,

Can you elaborate on what it means to create one checkpoint object per
chain operator more than all operators? If you mean to do checkpoint
independently for each task, this is not supported.


Best regards,

Weijie


Talat Uyarer via user <us...@flink.apache.org> 于2023年2月1日周三 15:34写道:

> Hi,
>
> We have a job that is reading from kafka and writing some endpoints. The
> job does not have any shuffling steps.  I implement it with multiple
> steps.  Flink chained those operators in one operator in submission time.
> However I see all operators are doing checkpointing.
>
> Is there any way to create one checkpoint object per chain operator rather
> than all operators ?
>
> Thanks
>