You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Raghavendar T S <ra...@gmail.com> on 2021/04/29 10:52:55 UTC

Flink Checkpoint for Stateless Operators

Hi Team

Assume that we have a job (Checkpoint enabled) with Kafka source and a
stateless operator which consumes events from Kafka source.
We have a stream of records 1, 2, 3, 4, 5,...Let's say the event 1 reaches
the Flat Map operator and is being processed. Then the Kafka source has
made a successful checkpoint.
In this case, does the offset of event 1 will be part of the checkpoint?
Will Flink track the event from source to all downstream operators? If
this is a true case and If the processing of the event is failed (any third
party API/DB failure) in the Flat Map after a successful checkpoint, do we
need to manually re-process (retry using queue or any other business logic)
the event?

Job:
Kafka Source -> Flat Map

Thank you

-- 
Raghavendar T S
www.teknosrc.com

<https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
Virus-free.
www.avast.com
<https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
<#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

Re: Flink Checkpoint for Stateless Operators

Posted by Roman Khachatryan <ro...@apache.org>.
Hi Raghavendar,

It sounds like you don't actually have flatMap logic, in which case you
should use a sink instead of a flatMap.
And probably one of the existing ones, as some of them already provide
exactly-once guarantee [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/guarantees.html
Regards,
Roman


On Thu, Apr 29, 2021 at 5:55 PM Raghavendar T S <ra...@gmail.com>
wrote:

> Hi Roman
>
> I am just doing write operations from the flat map. Does it matter If I
> use a flap map or sink for this purpose?
>
> Thank you
>
>
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail> Virus-free.
> www.avast.com
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
> <#m_8876029702709238175_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>
> On Thu, Apr 29, 2021 at 9:10 PM Roman Khachatryan <ro...@apache.org>
> wrote:
>
>> Flink uses checkpoint barriers that are sent through along the same
>> channels as data. Events are included into the checkpoint if they precede
>> the corresponding barrier (or the RPC call for sources). [1] is the
>> algorithm description and [2] is about integration with Kafka.
>>
>> > In my example, I have only 1 Source and 1 Flat Map. Do you mean to say
>> that we need to use a sink instead of a flat map?
>> I'm not sure I understand the use case. What do you do with the results
>> of Flat Map?
>>
>> [1 https://arxiv.org/pdf/1506.08603.pdf
>> [2]
>> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Apr 29, 2021 at 3:16 PM Raghavendar T S <ra...@gmail.com>
>> wrote:
>>
>>> Hi Roman
>>>
>>> In general, how Flink tracks the events from source to downstream
>>> operators? We usually emit existing events from an operator or create a new
>>> instance of a class and emit it. How does Flink or Flink source know
>>> whether the events are which snapshot?
>>>
>>> > So you don't need to re-process it manually (given that the sink
>>> provides exactly once guarantee).
>>> In my example, I have only 1 Source and 1 Flat Map. Do you mean to say
>>> that we need to use a sink instead of a flat map?
>>>
>>> Thank you
>>>
>>>
>>>
>>> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail> Virus-free.
>>> www.avast.com
>>> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
>>> <#m_8876029702709238175_m_-7592641136541672980_m_9175636772900776859_m_7337441106478363842_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>>>
>>> On Thu, Apr 29, 2021 at 6:13 PM Roman Khachatryan <ro...@apache.org>
>>> wrote:
>>>
>>>> Hi Raghavendar,
>>>>
>>>> In Flink, checkpoints are global, meaning that a checkpoint is
>>>> successful only if all operators acknowledge it. So the offset will be
>>>> stored in state and then committed to Kafka [1] only after all the tasks
>>>> acknowledge that checkpoint. At that moment, the element must be either
>>>> emitted to the external system, stored in the operator state (e.g. window);
>>>> or in channel state (with Unaligned checkpoints).
>>>>
>>>> So you don't need to re-process it manually (given that the sink
>>>> provides exactly once guarantee).
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>>
>>>> On Thu, Apr 29, 2021 at 12:53 PM Raghavendar T S <
>>>> raghav280392@gmail.com> wrote:
>>>>
>>>>> Hi Team
>>>>>
>>>>> Assume that we have a job (Checkpoint enabled) with Kafka source and a
>>>>> stateless operator which consumes events from Kafka source.
>>>>> We have a stream of records 1, 2, 3, 4, 5,...Let's say the event 1
>>>>> reaches the Flat Map operator and is being processed. Then the Kafka source
>>>>> has made a successful checkpoint.
>>>>> In this case, does the offset of event 1 will be part of the
>>>>> checkpoint? Will Flink track the event from source to all downstream
>>>>> operators? If this is a true case and If the processing of the event is
>>>>> failed (any third party API/DB failure) in the Flat Map after a successful
>>>>> checkpoint, do we need to manually re-process (retry using queue or any
>>>>> other business logic) the event?
>>>>>
>>>>> Job:
>>>>> Kafka Source -> Flat Map
>>>>>
>>>>> Thank you
>>>>>
>>>>> --
>>>>> Raghavendar T S
>>>>> www.teknosrc.com
>>>>>
>>>>>
>>>>> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail> Virus-free.
>>>>> www.avast.com
>>>>> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
>>>>> <#m_8876029702709238175_m_-7592641136541672980_m_9175636772900776859_m_7337441106478363842_m_8244812403188651414_m_-8513586134257021362_m_-166071601284684373_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>>>>>
>>>>
>>>
>>> --
>>> Raghavendar T S
>>> www.teknosrc.com
>>>
>>
>
> --
> Raghavendar T S
> www.teknosrc.com
>

Re: Flink Checkpoint for Stateless Operators

Posted by Raghavendar T S <ra...@gmail.com>.
Hi Roman

I am just doing write operations from the flat map. Does it matter If I use
a flap map or sink for this purpose?

Thank you

<https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
Virus-free.
www.avast.com
<https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
<#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

On Thu, Apr 29, 2021 at 9:10 PM Roman Khachatryan <ro...@apache.org> wrote:

> Flink uses checkpoint barriers that are sent through along the same
> channels as data. Events are included into the checkpoint if they precede
> the corresponding barrier (or the RPC call for sources). [1] is the
> algorithm description and [2] is about integration with Kafka.
>
> > In my example, I have only 1 Source and 1 Flat Map. Do you mean to say
> that we need to use a sink instead of a flat map?
> I'm not sure I understand the use case. What do you do with the results of
> Flat Map?
>
> [1 https://arxiv.org/pdf/1506.08603.pdf
> [2]
> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>
> Regards,
> Roman
>
>
> On Thu, Apr 29, 2021 at 3:16 PM Raghavendar T S <ra...@gmail.com>
> wrote:
>
>> Hi Roman
>>
>> In general, how Flink tracks the events from source to downstream
>> operators? We usually emit existing events from an operator or create a new
>> instance of a class and emit it. How does Flink or Flink source know
>> whether the events are which snapshot?
>>
>> > So you don't need to re-process it manually (given that the sink
>> provides exactly once guarantee).
>> In my example, I have only 1 Source and 1 Flat Map. Do you mean to say
>> that we need to use a sink instead of a flat map?
>>
>> Thank you
>>
>>
>>
>> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail> Virus-free.
>> www.avast.com
>> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
>> <#m_-7592641136541672980_m_9175636772900776859_m_7337441106478363842_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>>
>> On Thu, Apr 29, 2021 at 6:13 PM Roman Khachatryan <ro...@apache.org>
>> wrote:
>>
>>> Hi Raghavendar,
>>>
>>> In Flink, checkpoints are global, meaning that a checkpoint is
>>> successful only if all operators acknowledge it. So the offset will be
>>> stored in state and then committed to Kafka [1] only after all the tasks
>>> acknowledge that checkpoint. At that moment, the element must be either
>>> emitted to the external system, stored in the operator state (e.g. window);
>>> or in channel state (with Unaligned checkpoints).
>>>
>>> So you don't need to re-process it manually (given that the sink
>>> provides exactly once guarantee).
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Thu, Apr 29, 2021 at 12:53 PM Raghavendar T S <ra...@gmail.com>
>>> wrote:
>>>
>>>> Hi Team
>>>>
>>>> Assume that we have a job (Checkpoint enabled) with Kafka source and a
>>>> stateless operator which consumes events from Kafka source.
>>>> We have a stream of records 1, 2, 3, 4, 5,...Let's say the event 1
>>>> reaches the Flat Map operator and is being processed. Then the Kafka source
>>>> has made a successful checkpoint.
>>>> In this case, does the offset of event 1 will be part of the
>>>> checkpoint? Will Flink track the event from source to all downstream
>>>> operators? If this is a true case and If the processing of the event is
>>>> failed (any third party API/DB failure) in the Flat Map after a successful
>>>> checkpoint, do we need to manually re-process (retry using queue or any
>>>> other business logic) the event?
>>>>
>>>> Job:
>>>> Kafka Source -> Flat Map
>>>>
>>>> Thank you
>>>>
>>>> --
>>>> Raghavendar T S
>>>> www.teknosrc.com
>>>>
>>>>
>>>> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail> Virus-free.
>>>> www.avast.com
>>>> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
>>>> <#m_-7592641136541672980_m_9175636772900776859_m_7337441106478363842_m_8244812403188651414_m_-8513586134257021362_m_-166071601284684373_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>>>>
>>>
>>
>> --
>> Raghavendar T S
>> www.teknosrc.com
>>
>

-- 
Raghavendar T S
www.teknosrc.com

Re: Flink Checkpoint for Stateless Operators

Posted by Roman Khachatryan <ro...@apache.org>.
Flink uses checkpoint barriers that are sent through along the same
channels as data. Events are included into the checkpoint if they precede
the corresponding barrier (or the RPC call for sources). [1] is the
algorithm description and [2] is about integration with Kafka.

> In my example, I have only 1 Source and 1 Flat Map. Do you mean to say
that we need to use a sink instead of a flat map?
I'm not sure I understand the use case. What do you do with the results of
Flat Map?

[1 https://arxiv.org/pdf/1506.08603.pdf
[2]
https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

Regards,
Roman


On Thu, Apr 29, 2021 at 3:16 PM Raghavendar T S <ra...@gmail.com>
wrote:

> Hi Roman
>
> In general, how Flink tracks the events from source to downstream
> operators? We usually emit existing events from an operator or create a new
> instance of a class and emit it. How does Flink or Flink source know
> whether the events are which snapshot?
>
> > So you don't need to re-process it manually (given that the sink
> provides exactly once guarantee).
> In my example, I have only 1 Source and 1 Flat Map. Do you mean to say
> that we need to use a sink instead of a flat map?
>
> Thank you
>
>
>
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail> Virus-free.
> www.avast.com
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
> <#m_9175636772900776859_m_7337441106478363842_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>
> On Thu, Apr 29, 2021 at 6:13 PM Roman Khachatryan <ro...@apache.org>
> wrote:
>
>> Hi Raghavendar,
>>
>> In Flink, checkpoints are global, meaning that a checkpoint is successful
>> only if all operators acknowledge it. So the offset will be stored in state
>> and then committed to Kafka [1] only after all the tasks acknowledge that
>> checkpoint. At that moment, the element must be either emitted to the
>> external system, stored in the operator state (e.g. window); or in channel
>> state (with Unaligned checkpoints).
>>
>> So you don't need to re-process it manually (given that the sink provides
>> exactly once guarantee).
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Apr 29, 2021 at 12:53 PM Raghavendar T S <ra...@gmail.com>
>> wrote:
>>
>>> Hi Team
>>>
>>> Assume that we have a job (Checkpoint enabled) with Kafka source and a
>>> stateless operator which consumes events from Kafka source.
>>> We have a stream of records 1, 2, 3, 4, 5,...Let's say the event 1
>>> reaches the Flat Map operator and is being processed. Then the Kafka source
>>> has made a successful checkpoint.
>>> In this case, does the offset of event 1 will be part of the checkpoint?
>>> Will Flink track the event from source to all downstream operators? If
>>> this is a true case and If the processing of the event is failed (any third
>>> party API/DB failure) in the Flat Map after a successful checkpoint, do we
>>> need to manually re-process (retry using queue or any other business logic)
>>> the event?
>>>
>>> Job:
>>> Kafka Source -> Flat Map
>>>
>>> Thank you
>>>
>>> --
>>> Raghavendar T S
>>> www.teknosrc.com
>>>
>>>
>>> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail> Virus-free.
>>> www.avast.com
>>> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
>>> <#m_9175636772900776859_m_7337441106478363842_m_8244812403188651414_m_-8513586134257021362_m_-166071601284684373_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>>>
>>
>
> --
> Raghavendar T S
> www.teknosrc.com
>

Re: Flink Checkpoint for Stateless Operators

Posted by Raghavendar T S <ra...@gmail.com>.
Hi Roman

In general, how Flink tracks the events from source to downstream
operators? We usually emit existing events from an operator or create a new
instance of a class and emit it. How does Flink or Flink source know
whether the events are which snapshot?

> So you don't need to re-process it manually (given that the sink provides
exactly once guarantee).
In my example, I have only 1 Source and 1 Flat Map. Do you mean to say that
we need to use a sink instead of a flat map?

Thank you


<https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
Virus-free.
www.avast.com
<https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
<#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

On Thu, Apr 29, 2021 at 6:13 PM Roman Khachatryan <ro...@apache.org> wrote:

> Hi Raghavendar,
>
> In Flink, checkpoints are global, meaning that a checkpoint is successful
> only if all operators acknowledge it. So the offset will be stored in state
> and then committed to Kafka [1] only after all the tasks acknowledge that
> checkpoint. At that moment, the element must be either emitted to the
> external system, stored in the operator state (e.g. window); or in channel
> state (with Unaligned checkpoints).
>
> So you don't need to re-process it manually (given that the sink provides
> exactly once guarantee).
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>
> Regards,
> Roman
>
>
> On Thu, Apr 29, 2021 at 12:53 PM Raghavendar T S <ra...@gmail.com>
> wrote:
>
>> Hi Team
>>
>> Assume that we have a job (Checkpoint enabled) with Kafka source and a
>> stateless operator which consumes events from Kafka source.
>> We have a stream of records 1, 2, 3, 4, 5,...Let's say the event 1
>> reaches the Flat Map operator and is being processed. Then the Kafka source
>> has made a successful checkpoint.
>> In this case, does the offset of event 1 will be part of the checkpoint?
>> Will Flink track the event from source to all downstream operators? If
>> this is a true case and If the processing of the event is failed (any third
>> party API/DB failure) in the Flat Map after a successful checkpoint, do we
>> need to manually re-process (retry using queue or any other business logic)
>> the event?
>>
>> Job:
>> Kafka Source -> Flat Map
>>
>> Thank you
>>
>> --
>> Raghavendar T S
>> www.teknosrc.com
>>
>>
>> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail> Virus-free.
>> www.avast.com
>> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
>> <#m_8244812403188651414_m_-8513586134257021362_m_-166071601284684373_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>>
>

-- 
Raghavendar T S
www.teknosrc.com

Re: Flink Checkpoint for Stateless Operators

Posted by Roman Khachatryan <ro...@apache.org>.
Hi Raghavendar,

In Flink, checkpoints are global, meaning that a checkpoint is successful
only if all operators acknowledge it. So the offset will be stored in state
and then committed to Kafka [1] only after all the tasks acknowledge that
checkpoint. At that moment, the element must be either emitted to the
external system, stored in the operator state (e.g. window); or in channel
state (with Unaligned checkpoints).

So you don't need to re-process it manually (given that the sink provides
exactly once guarantee).

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration

Regards,
Roman


On Thu, Apr 29, 2021 at 12:53 PM Raghavendar T S <ra...@gmail.com>
wrote:

> Hi Team
>
> Assume that we have a job (Checkpoint enabled) with Kafka source and a
> stateless operator which consumes events from Kafka source.
> We have a stream of records 1, 2, 3, 4, 5,...Let's say the event 1 reaches
> the Flat Map operator and is being processed. Then the Kafka source has
> made a successful checkpoint.
> In this case, does the offset of event 1 will be part of the checkpoint?
> Will Flink track the event from source to all downstream operators? If
> this is a true case and If the processing of the event is failed (any third
> party API/DB failure) in the Flat Map after a successful checkpoint, do we
> need to manually re-process (retry using queue or any other business logic)
> the event?
>
> Job:
> Kafka Source -> Flat Map
>
> Thank you
>
> --
> Raghavendar T S
> www.teknosrc.com
>
>
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail> Virus-free.
> www.avast.com
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
> <#m_-166071601284684373_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>