You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rahul Patwari <ra...@gmail.com> on 2021/04/12 10:10:51 UTC

Manage Kafka Offsets for Fault Tolerance Without Checkpointing

Hello,

*Context*:

We have a stateless Flink Pipeline which reads from Kafka topics.
The pipeline has a Windowing operator(Used only for introducing a delay in
processing records) and AsyncI/O operators (used for Lookup/Enrichment).

"At least Once" Processing semantics is needed for the pipeline to avoid
data loss.

Checkpointing is disabled and we are dependent on the auto offset commit of
Kafka consumer for fault tolerance currently.

As auto offset commit indicates that "the record is successfully read",
instead of "the record is successfully processed", there will be data loss
if there is a restart when the offset is committed to Kafka but not
successfully processed by the Flink Pipeline, as the record is NOT replayed
again when the pipeline is restarted.

Checkpointing can solve this problem. But, since the pipeline is stateless,
we do not want to use checkpointing, which will persist all the records in
Windowing Operator and in-flight Async I/O calls.

*Question*:

We are looking for other ways to guarantee "at least once" processing
without checkpointing. One such way is to manage Kafka Offsets Externally.

We can maintain offsets of each partition of each topic in Cassandra(or
maintain timestamp, where all records with timestamps less than this
timestamp are successfully processed) and configure Kafka consumer Start
Position
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration>
- setStartFromTimestamp() or setStartFromSpecificOffsets()

This will be helpful if the pipeline is manually restarted (say, JobManager
pod is restarted). *But, how to avoid data loss in case of internal
restarts?*

Has anyone used this approach?
What are other ways to guarantee "at least once" processing without
checkpointing for a stateless Flink pipeline?

Thanks,
Rahul

Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

Posted by Arvid Heise <ar...@apache.org>.
Hi Rahul,

Checkpointing is Flink's way of providing processing guarantees "at least
once"/"exactly once". So your question is like asking if a car offers any
safety without you wanting to use a built-in belt and airbags. Sure you
could install your own safety features but chances are that your solution
isn't working as well as the built-in stuff.
I also see little use to add a different way to Flink in general. Flink is
already complex enough; the community and newer developers suffer more from
having too many (not so well) documented options than having too little
options.

With that being said, let's see how you can build your own checkpointing.

I haven't explored this approach. Wouldn't the backpressure gets propagated
> upstream and the consumption rate from Kafka gets affected by it?

Yes, and that's what you have now as well. You just have two sources of
backpressure (window + async I/O). You can combine it into one source and
reduce state significantly. But tbh I haven't fully understood your window
operator and why it's necessary at all. It sounds a bit like artificially
slowing down your pipeline.

I am new to delegating sink. Can you please help me to understand how this
> approach helps in failures or recovery to replay records from Kafka?
> Currently, I am using auto offset commit of Kafka Consumer, which I think
> commits offsets every 5 seconds, by default.
>
Afaik auto-commit commits the offsets while fetching a new batch of records.

The delegating sink would work more like Kafka Streams; it would only
commit those records that have been written. Let's say you have read record
A, B, C and you are now writing A, you would only commit the offset of A.
On failure and recovery, you would only read B, C since they are
uncommitted.

The downside of this approach is that your throughput will always be
limited as the fine-grain commit costs you quite a bit of performance (you
need for acknowledgement, so a full TCP roundtrip + Kafka broker commit
time + Kafka broker replication time).

On Tue, Apr 13, 2021 at 5:38 PM Rahul Patwari <ra...@gmail.com>
wrote:

> Hi Arvid,
>
> Thanks for your inputs. They are super helpful.
>
> Why do you need the window operator at all? Couldn't you just backpressure
>> on the async I/O by delaying the processing there?
>>
>
> I haven't explored this approach. Wouldn't the backpressure gets
> propagated upstream and the consumption rate from Kafka gets affected by it?
>
> What's keeping you from attaching the offset of the Kafka records to A, B,
>> C and write the offset when writing the record into the sink? (Probably
>> need to wrap your sink function into a delegating sink function)
>>
>
> I am new to delegating sink. Can you please help me to understand how this
> approach helps in failures or recovery to replay records from Kafka?
> Currently, I am using auto offset commit of Kafka Consumer, which I think
> commits offsets every 5 seconds, by default.
>
> Checkpointing will definitely solve the problem. But, replaying records
> from Kafka seems like a simpler approach to guarantee "at least once"
> processing throughout the life of the stateless pipeline. Replaying records
> from Kafka also seems like a simpler approach operationally.
>
> Is there no other way to guarantee "at least once" processing without
> checkpointing?
>
> Checkpointing seems like is the only approach to guarantee "at least
> once"/"exactly once" processing for stateful pipelines. But support for
> replaying records to guarantee "at least once" processing would be helpful.
>
> I know that Checkpointing has "at least once" mode. Probably we can add
> one more mode where records are replayed from Source and State is not
> checkpointed. Just a Suggestion. What are your thoughts? In this case, this
> approach will be very helpful where only Kafka offsets are checkpointed.
>
> Thanks,
> Rahul
>
>
> On Tue, Apr 13, 2021 at 7:20 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Rahul,
>>
>> This pipeline should process millions of records per day with low
>>> latency.
>>> I am avoiding Checkpointing, as the records in the Window operator and
>>> in-flight records in the Async I/O operator are persisted along with the
>>> Kafka offsets. But the records in Window and Async I/O operators can be
>>> obtained just by replaying the records from Kafka Source, which is the
>>> approach I want to take.
>>> There is Deduplication logic in the pipeline. So, I am preferring to
>>> replay records in case of failures rather than storing the incremental
>>> snapshots.
>>>
>> Did you measure how much data is being checkpointed? A few million
>> records per day should just be a few megabytes per checkpoint.
>> It sounds to me as if you would rather want your approach because it's
>> conceptually more sound but not because it is necessary.
>>
>> I'm asking because delaying records and using async I/O is increasing
>> your latency significantly anyways. So a couple of additional ms to the
>> checkpoint don't sound like it will invalidate your use case to me. It will
>> also be cheaper if you factor in your work time and will also work if you
>> ever extend your pipeline to hold state.
>> Recovery should also not be worse because you restore the records from
>> blob storage instead of fetching it from the source system.
>>
>> Also let me repeat these questions
>>
>>> 1. Why do you need the window operator at all? Couldn't you just
>>> backpressure on the async I/O by delaying the processing there?
>>>
>> 2. What's keeping you from attaching the offset of the Kafka records to
>>> A, B, C and write the offset when writing the record into the sink?
>>> (Probably need to wrap your sink function into a delegating sink function)
>>>
>>
>>
>>
>>
>> On Tue, Apr 13, 2021 at 12:33 PM Rahul Patwari <
>> rahulpatwari8383@gmail.com> wrote:
>>
>>> Hi Arvid,
>>>
>>> Thanks for the reply.
>>>
>>> could you please help me to understand how the at least once guarantee
>>>> would work without checkpointing in your case?
>>>>
>>>
>>> This was the plan to maintain "at least once" guarantee:
>>> Logic at Sink:
>>> The DataStream on which Sink Function is applied, on the same
>>> DataStream, apply a widowing operator and compute min of the timestamps of
>>> records and persist the timestamp in Cassandra -> This will persist the
>>> record timestamp below which all the records are processed, say, every 10
>>> seconds. (The delay created by the Windowing Operator used here makes sure
>>> that the timestamp is persisted in Cassandra only after it is written to
>>> Sink)
>>> Note: There is another Windowing Operator at the Source to delay the
>>> processing of records.
>>>
>>> Logic at Source:
>>> While creating the JobGraph, read the timestamp persisted in Cassandra
>>> for each topic and configure the start position of Kafka Consumer.
>>>
>>> The problem is that the start positions are not respected when there are
>>> Automatic restarts during failures. Basically, we wanted to read the
>>> timestamp from Cassandra and start consuming from the timestamp even in
>>> case of Automatic restarts during failures.
>>>
>>> This pipeline should process millions of records per day with low
>>> latency.
>>> I am avoiding Checkpointing, as the records in the Window operator and
>>> in-flight records in the Async I/O operator are persisted along with the
>>> Kafka offsets. But the records in Window and Async I/O operators can be
>>> obtained just by replaying the records from Kafka Source, which is the
>>> approach I want to take.
>>> There is Deduplication logic in the pipeline. So, I am preferring to
>>> replay records in case of failures rather than storing the incremental
>>> snapshots.
>>>
>>> Thanks,
>>> Rahul
>>>
>>> On Tue, Apr 13, 2021 at 2:53 PM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> Hi Rahul,
>>>>
>>>> could you please help me to understand how the at least once guarantee
>>>> would work without checkpointing in your case?
>>>>
>>>> Let's say you read records A, B, C. You use a window to delay
>>>> processing, so let's say A passes and B, C are still in the window for the
>>>> trigger.
>>>>
>>>> Now do you want to auto commit the offset of A after it being written
>>>> in the sink? If so, what's keeping you from attaching the offset of the
>>>> Kafka records to A, B, C and write the offset when writing the record into
>>>> the sink? (Probably need to wrap your sink function into a delegating sink
>>>> function)
>>>>
>>>> The way, Flink does the checkpointing is that it checkpoints the offset
>>>> of C, and the state of the window (containing B, C) to avoid data loss. Why
>>>> is that not working for you? Which state size do you expect?
>>>>
>>>> Why do you need the window operator at all? Couldn't you just
>>>> backpressure on the async I/O by delaying the processing there? Then there
>>>> would be no need to change anything.
>>>>
>>>> On Tue, Apr 13, 2021 at 10:00 AM Roman Khachatryan <ro...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Rahul,
>>>>>
>>>>> Right. There are no workarounds as far as I know.
>>>>>
>>>>> Regards,
>>>>> Roman
>>>>>
>>>>> On Mon, Apr 12, 2021 at 9:00 PM Rahul Patwari
>>>>> <ra...@gmail.com> wrote:
>>>>> >
>>>>> > Hi Roman, Arvid,
>>>>> >
>>>>> > So, to achieve "at least once" guarantee, currently, automatic
>>>>> restart of Flink should be disabled?
>>>>> > Is there any workaround to get "at least once" semantics with Flink
>>>>> Automatic restarts in this case?
>>>>> >
>>>>> > Regards,
>>>>> > Rahul
>>>>> >
>>>>> > On Mon, Apr 12, 2021 at 9:57 PM Roman Khachatryan <ro...@apache.org>
>>>>> wrote:
>>>>> >>
>>>>> >> Hi,
>>>>> >>
>>>>> >> Thanks for the clarification.
>>>>> >>
>>>>> >> > Other than managing offsets externally, Are there any other ways
>>>>> to guarantee "at least once" processing without enabling checkpointing?
>>>>> >>
>>>>> >> That's currently not possible, at least with the default connector.
>>>>> >>
>>>>> >> Regards,
>>>>> >> Roman
>>>>> >>
>>>>> >> On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari
>>>>> >> <ra...@gmail.com> wrote:
>>>>> >> >
>>>>> >> > Hi Roman,
>>>>> >> >
>>>>> >> > Thanks for the reply.
>>>>> >> > This is what I meant by Internal restarts - Automatic restore of
>>>>> Flink Job from a failure. For example, pipeline restarts when Fixed delay
>>>>> or Failure Rate restart strategies are configured.
>>>>> >> >
>>>>> >> > Quoting documentation in this link - Configuring Kafka Consumer
>>>>> start position configuration
>>>>> >> >
>>>>> >> >> Note that these start position configuration methods do not
>>>>> affect the start position when the job is automatically restored from a
>>>>> failure
>>>>> >> >
>>>>> >> >
>>>>> >> >
>>>>> >> > It seems that there will be data loss even when offsets are
>>>>> managed externally when there are pipeline restarts due to a failure, say,
>>>>> an exception. On the other hand, when the pipeline is stopped and
>>>>> resubmitted(say, an upgrade), there won't be any data loss as offsets are
>>>>> retrieved from an external store and configured while starting Kafka
>>>>> Consumer.
>>>>> >> >
>>>>> >> > We do not want to enable checkpointing as the pipeline is
>>>>> stateless. We have Deduplication logic in the pipeline and the processing
>>>>> is idempotent.
>>>>> >> >
>>>>> >> > Other than managing offsets externally, Are there any other ways
>>>>> to guarantee "at least once" processing without enabling checkpointing?
>>>>> >> >
>>>>> >> > Thanks,
>>>>> >> > Rahul
>>>>> >> >
>>>>> >> > On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan <
>>>>> roman@apache.org> wrote:
>>>>> >> >>
>>>>> >> >> Hi,
>>>>> >> >>
>>>>> >> >> Could you please explain what you mean by internal restarts?
>>>>> >> >>
>>>>> >> >> If you commit offsets or timestamps from sink after emitting
>>>>> records
>>>>> >> >> to the external system then there should be no data loss.
>>>>> >> >> Otherwise (if you commit offsets earlier), you have to persist
>>>>> >> >> in-flight records to avoid data loss (i.e. enable checkpointing).
>>>>> >> >>
>>>>> >> >> Regards,
>>>>> >> >> Roman
>>>>> >> >>
>>>>> >> >> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
>>>>> >> >> <ra...@gmail.com> wrote:
>>>>> >> >> >
>>>>> >> >> > Hello,
>>>>> >> >> >
>>>>> >> >> > Context:
>>>>> >> >> >
>>>>> >> >> > We have a stateless Flink Pipeline which reads from Kafka
>>>>> topics.
>>>>> >> >> > The pipeline has a Windowing operator(Used only for
>>>>> introducing a delay in processing records) and AsyncI/O operators (used for
>>>>> Lookup/Enrichment).
>>>>> >> >> >
>>>>> >> >> > "At least Once" Processing semantics is needed for the
>>>>> pipeline to avoid data loss.
>>>>> >> >> >
>>>>> >> >> > Checkpointing is disabled and we are dependent on the auto
>>>>> offset commit of Kafka consumer for fault tolerance currently.
>>>>> >> >> >
>>>>> >> >> > As auto offset commit indicates that "the record is
>>>>> successfully read", instead of "the record is successfully processed",
>>>>> there will be data loss if there is a restart when the offset is committed
>>>>> to Kafka but not successfully processed by the Flink Pipeline, as the
>>>>> record is NOT replayed again when the pipeline is restarted.
>>>>> >> >> >
>>>>> >> >> > Checkpointing can solve this problem. But, since the pipeline
>>>>> is stateless, we do not want to use checkpointing, which will persist all
>>>>> the records in Windowing Operator and in-flight Async I/O calls.
>>>>> >> >> >
>>>>> >> >> > Question:
>>>>> >> >> >
>>>>> >> >> > We are looking for other ways to guarantee "at least once"
>>>>> processing without checkpointing. One such way is to manage Kafka Offsets
>>>>> Externally.
>>>>> >> >> >
>>>>> >> >> > We can maintain offsets of each partition of each topic in
>>>>> Cassandra(or maintain timestamp, where all records with timestamps less
>>>>> than this timestamp are successfully processed) and configure Kafka
>>>>> consumer Start Position - setStartFromTimestamp() or
>>>>> setStartFromSpecificOffsets()
>>>>> >> >> >
>>>>> >> >> > This will be helpful if the pipeline is manually restarted
>>>>> (say, JobManager pod is restarted). But, how to avoid data loss in case of
>>>>> internal restarts?
>>>>> >> >> >
>>>>> >> >> > Has anyone used this approach?
>>>>> >> >> > What are other ways to guarantee "at least once" processing
>>>>> without checkpointing for a stateless Flink pipeline?
>>>>> >> >> >
>>>>> >> >> > Thanks,
>>>>> >> >> > Rahul
>>>>>
>>>>

Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

Posted by Rahul Patwari <ra...@gmail.com>.
Hi Arvid,

Thanks for your inputs. They are super helpful.

Why do you need the window operator at all? Couldn't you just backpressure
> on the async I/O by delaying the processing there?
>

I haven't explored this approach. Wouldn't the backpressure gets propagated
upstream and the consumption rate from Kafka gets affected by it?

What's keeping you from attaching the offset of the Kafka records to A, B,
> C and write the offset when writing the record into the sink? (Probably
> need to wrap your sink function into a delegating sink function)
>

I am new to delegating sink. Can you please help me to understand how this
approach helps in failures or recovery to replay records from Kafka?
Currently, I am using auto offset commit of Kafka Consumer, which I think
commits offsets every 5 seconds, by default.

Checkpointing will definitely solve the problem. But, replaying records
from Kafka seems like a simpler approach to guarantee "at least once"
processing throughout the life of the stateless pipeline. Replaying records
from Kafka also seems like a simpler approach operationally.

Is there no other way to guarantee "at least once" processing without
checkpointing?

Checkpointing seems like is the only approach to guarantee "at least
once"/"exactly once" processing for stateful pipelines. But support for
replaying records to guarantee "at least once" processing would be helpful.

I know that Checkpointing has "at least once" mode. Probably we can add one
more mode where records are replayed from Source and State is not
checkpointed. Just a Suggestion. What are your thoughts? In this case, this
approach will be very helpful where only Kafka offsets are checkpointed.

Thanks,
Rahul


On Tue, Apr 13, 2021 at 7:20 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Rahul,
>
> This pipeline should process millions of records per day with low latency.
>> I am avoiding Checkpointing, as the records in the Window operator and
>> in-flight records in the Async I/O operator are persisted along with the
>> Kafka offsets. But the records in Window and Async I/O operators can be
>> obtained just by replaying the records from Kafka Source, which is the
>> approach I want to take.
>> There is Deduplication logic in the pipeline. So, I am preferring to
>> replay records in case of failures rather than storing the incremental
>> snapshots.
>>
> Did you measure how much data is being checkpointed? A few million records
> per day should just be a few megabytes per checkpoint.
> It sounds to me as if you would rather want your approach because it's
> conceptually more sound but not because it is necessary.
>
> I'm asking because delaying records and using async I/O is increasing your
> latency significantly anyways. So a couple of additional ms to the
> checkpoint don't sound like it will invalidate your use case to me. It will
> also be cheaper if you factor in your work time and will also work if you
> ever extend your pipeline to hold state.
> Recovery should also not be worse because you restore the records from
> blob storage instead of fetching it from the source system.
>
> Also let me repeat these questions
>
>> 1. Why do you need the window operator at all? Couldn't you just
>> backpressure on the async I/O by delaying the processing there?
>>
> 2. What's keeping you from attaching the offset of the Kafka records to A,
>> B, C and write the offset when writing the record into the sink? (Probably
>> need to wrap your sink function into a delegating sink function)
>>
>
>
>
>
> On Tue, Apr 13, 2021 at 12:33 PM Rahul Patwari <ra...@gmail.com>
> wrote:
>
>> Hi Arvid,
>>
>> Thanks for the reply.
>>
>> could you please help me to understand how the at least once guarantee
>>> would work without checkpointing in your case?
>>>
>>
>> This was the plan to maintain "at least once" guarantee:
>> Logic at Sink:
>> The DataStream on which Sink Function is applied, on the same DataStream,
>> apply a widowing operator and compute min of the timestamps of records and
>> persist the timestamp in Cassandra -> This will persist the record
>> timestamp below which all the records are processed, say, every 10 seconds.
>> (The delay created by the Windowing Operator used here makes sure that the
>> timestamp is persisted in Cassandra only after it is written to Sink)
>> Note: There is another Windowing Operator at the Source to delay the
>> processing of records.
>>
>> Logic at Source:
>> While creating the JobGraph, read the timestamp persisted in Cassandra
>> for each topic and configure the start position of Kafka Consumer.
>>
>> The problem is that the start positions are not respected when there are
>> Automatic restarts during failures. Basically, we wanted to read the
>> timestamp from Cassandra and start consuming from the timestamp even in
>> case of Automatic restarts during failures.
>>
>> This pipeline should process millions of records per day with low
>> latency.
>> I am avoiding Checkpointing, as the records in the Window operator and
>> in-flight records in the Async I/O operator are persisted along with the
>> Kafka offsets. But the records in Window and Async I/O operators can be
>> obtained just by replaying the records from Kafka Source, which is the
>> approach I want to take.
>> There is Deduplication logic in the pipeline. So, I am preferring to
>> replay records in case of failures rather than storing the incremental
>> snapshots.
>>
>> Thanks,
>> Rahul
>>
>> On Tue, Apr 13, 2021 at 2:53 PM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi Rahul,
>>>
>>> could you please help me to understand how the at least once guarantee
>>> would work without checkpointing in your case?
>>>
>>> Let's say you read records A, B, C. You use a window to delay
>>> processing, so let's say A passes and B, C are still in the window for the
>>> trigger.
>>>
>>> Now do you want to auto commit the offset of A after it being written in
>>> the sink? If so, what's keeping you from attaching the offset of the Kafka
>>> records to A, B, C and write the offset when writing the record into the
>>> sink? (Probably need to wrap your sink function into a delegating sink
>>> function)
>>>
>>> The way, Flink does the checkpointing is that it checkpoints the offset
>>> of C, and the state of the window (containing B, C) to avoid data loss. Why
>>> is that not working for you? Which state size do you expect?
>>>
>>> Why do you need the window operator at all? Couldn't you just
>>> backpressure on the async I/O by delaying the processing there? Then there
>>> would be no need to change anything.
>>>
>>> On Tue, Apr 13, 2021 at 10:00 AM Roman Khachatryan <ro...@apache.org>
>>> wrote:
>>>
>>>> Hi Rahul,
>>>>
>>>> Right. There are no workarounds as far as I know.
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>> On Mon, Apr 12, 2021 at 9:00 PM Rahul Patwari
>>>> <ra...@gmail.com> wrote:
>>>> >
>>>> > Hi Roman, Arvid,
>>>> >
>>>> > So, to achieve "at least once" guarantee, currently, automatic
>>>> restart of Flink should be disabled?
>>>> > Is there any workaround to get "at least once" semantics with Flink
>>>> Automatic restarts in this case?
>>>> >
>>>> > Regards,
>>>> > Rahul
>>>> >
>>>> > On Mon, Apr 12, 2021 at 9:57 PM Roman Khachatryan <ro...@apache.org>
>>>> wrote:
>>>> >>
>>>> >> Hi,
>>>> >>
>>>> >> Thanks for the clarification.
>>>> >>
>>>> >> > Other than managing offsets externally, Are there any other ways
>>>> to guarantee "at least once" processing without enabling checkpointing?
>>>> >>
>>>> >> That's currently not possible, at least with the default connector.
>>>> >>
>>>> >> Regards,
>>>> >> Roman
>>>> >>
>>>> >> On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari
>>>> >> <ra...@gmail.com> wrote:
>>>> >> >
>>>> >> > Hi Roman,
>>>> >> >
>>>> >> > Thanks for the reply.
>>>> >> > This is what I meant by Internal restarts - Automatic restore of
>>>> Flink Job from a failure. For example, pipeline restarts when Fixed delay
>>>> or Failure Rate restart strategies are configured.
>>>> >> >
>>>> >> > Quoting documentation in this link - Configuring Kafka Consumer
>>>> start position configuration
>>>> >> >
>>>> >> >> Note that these start position configuration methods do not
>>>> affect the start position when the job is automatically restored from a
>>>> failure
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> > It seems that there will be data loss even when offsets are
>>>> managed externally when there are pipeline restarts due to a failure, say,
>>>> an exception. On the other hand, when the pipeline is stopped and
>>>> resubmitted(say, an upgrade), there won't be any data loss as offsets are
>>>> retrieved from an external store and configured while starting Kafka
>>>> Consumer.
>>>> >> >
>>>> >> > We do not want to enable checkpointing as the pipeline is
>>>> stateless. We have Deduplication logic in the pipeline and the processing
>>>> is idempotent.
>>>> >> >
>>>> >> > Other than managing offsets externally, Are there any other ways
>>>> to guarantee "at least once" processing without enabling checkpointing?
>>>> >> >
>>>> >> > Thanks,
>>>> >> > Rahul
>>>> >> >
>>>> >> > On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan <
>>>> roman@apache.org> wrote:
>>>> >> >>
>>>> >> >> Hi,
>>>> >> >>
>>>> >> >> Could you please explain what you mean by internal restarts?
>>>> >> >>
>>>> >> >> If you commit offsets or timestamps from sink after emitting
>>>> records
>>>> >> >> to the external system then there should be no data loss.
>>>> >> >> Otherwise (if you commit offsets earlier), you have to persist
>>>> >> >> in-flight records to avoid data loss (i.e. enable checkpointing).
>>>> >> >>
>>>> >> >> Regards,
>>>> >> >> Roman
>>>> >> >>
>>>> >> >> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
>>>> >> >> <ra...@gmail.com> wrote:
>>>> >> >> >
>>>> >> >> > Hello,
>>>> >> >> >
>>>> >> >> > Context:
>>>> >> >> >
>>>> >> >> > We have a stateless Flink Pipeline which reads from Kafka
>>>> topics.
>>>> >> >> > The pipeline has a Windowing operator(Used only for introducing
>>>> a delay in processing records) and AsyncI/O operators (used for
>>>> Lookup/Enrichment).
>>>> >> >> >
>>>> >> >> > "At least Once" Processing semantics is needed for the pipeline
>>>> to avoid data loss.
>>>> >> >> >
>>>> >> >> > Checkpointing is disabled and we are dependent on the auto
>>>> offset commit of Kafka consumer for fault tolerance currently.
>>>> >> >> >
>>>> >> >> > As auto offset commit indicates that "the record is
>>>> successfully read", instead of "the record is successfully processed",
>>>> there will be data loss if there is a restart when the offset is committed
>>>> to Kafka but not successfully processed by the Flink Pipeline, as the
>>>> record is NOT replayed again when the pipeline is restarted.
>>>> >> >> >
>>>> >> >> > Checkpointing can solve this problem. But, since the pipeline
>>>> is stateless, we do not want to use checkpointing, which will persist all
>>>> the records in Windowing Operator and in-flight Async I/O calls.
>>>> >> >> >
>>>> >> >> > Question:
>>>> >> >> >
>>>> >> >> > We are looking for other ways to guarantee "at least once"
>>>> processing without checkpointing. One such way is to manage Kafka Offsets
>>>> Externally.
>>>> >> >> >
>>>> >> >> > We can maintain offsets of each partition of each topic in
>>>> Cassandra(or maintain timestamp, where all records with timestamps less
>>>> than this timestamp are successfully processed) and configure Kafka
>>>> consumer Start Position - setStartFromTimestamp() or
>>>> setStartFromSpecificOffsets()
>>>> >> >> >
>>>> >> >> > This will be helpful if the pipeline is manually restarted
>>>> (say, JobManager pod is restarted). But, how to avoid data loss in case of
>>>> internal restarts?
>>>> >> >> >
>>>> >> >> > Has anyone used this approach?
>>>> >> >> > What are other ways to guarantee "at least once" processing
>>>> without checkpointing for a stateless Flink pipeline?
>>>> >> >> >
>>>> >> >> > Thanks,
>>>> >> >> > Rahul
>>>>
>>>

Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

Posted by Arvid Heise <ar...@apache.org>.
Hi Rahul,

This pipeline should process millions of records per day with low latency.
> I am avoiding Checkpointing, as the records in the Window operator and
> in-flight records in the Async I/O operator are persisted along with the
> Kafka offsets. But the records in Window and Async I/O operators can be
> obtained just by replaying the records from Kafka Source, which is the
> approach I want to take.
> There is Deduplication logic in the pipeline. So, I am preferring to
> replay records in case of failures rather than storing the incremental
> snapshots.
>
Did you measure how much data is being checkpointed? A few million records
per day should just be a few megabytes per checkpoint.
It sounds to me as if you would rather want your approach because it's
conceptually more sound but not because it is necessary.

I'm asking because delaying records and using async I/O is increasing your
latency significantly anyways. So a couple of additional ms to the
checkpoint don't sound like it will invalidate your use case to me. It will
also be cheaper if you factor in your work time and will also work if you
ever extend your pipeline to hold state.
Recovery should also not be worse because you restore the records from blob
storage instead of fetching it from the source system.

Also let me repeat these questions

> 1. Why do you need the window operator at all? Couldn't you just
> backpressure on the async I/O by delaying the processing there?
>
2. What's keeping you from attaching the offset of the Kafka records to A,
> B, C and write the offset when writing the record into the sink? (Probably
> need to wrap your sink function into a delegating sink function)
>




On Tue, Apr 13, 2021 at 12:33 PM Rahul Patwari <ra...@gmail.com>
wrote:

> Hi Arvid,
>
> Thanks for the reply.
>
> could you please help me to understand how the at least once guarantee
>> would work without checkpointing in your case?
>>
>
> This was the plan to maintain "at least once" guarantee:
> Logic at Sink:
> The DataStream on which Sink Function is applied, on the same DataStream,
> apply a widowing operator and compute min of the timestamps of records and
> persist the timestamp in Cassandra -> This will persist the record
> timestamp below which all the records are processed, say, every 10 seconds.
> (The delay created by the Windowing Operator used here makes sure that the
> timestamp is persisted in Cassandra only after it is written to Sink)
> Note: There is another Windowing Operator at the Source to delay the
> processing of records.
>
> Logic at Source:
> While creating the JobGraph, read the timestamp persisted in Cassandra for
> each topic and configure the start position of Kafka Consumer.
>
> The problem is that the start positions are not respected when there are
> Automatic restarts during failures. Basically, we wanted to read the
> timestamp from Cassandra and start consuming from the timestamp even in
> case of Automatic restarts during failures.
>
> This pipeline should process millions of records per day with low latency.
> I am avoiding Checkpointing, as the records in the Window operator and
> in-flight records in the Async I/O operator are persisted along with the
> Kafka offsets. But the records in Window and Async I/O operators can be
> obtained just by replaying the records from Kafka Source, which is the
> approach I want to take.
> There is Deduplication logic in the pipeline. So, I am preferring to
> replay records in case of failures rather than storing the incremental
> snapshots.
>
> Thanks,
> Rahul
>
> On Tue, Apr 13, 2021 at 2:53 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Rahul,
>>
>> could you please help me to understand how the at least once guarantee
>> would work without checkpointing in your case?
>>
>> Let's say you read records A, B, C. You use a window to delay processing,
>> so let's say A passes and B, C are still in the window for the trigger.
>>
>> Now do you want to auto commit the offset of A after it being written in
>> the sink? If so, what's keeping you from attaching the offset of the Kafka
>> records to A, B, C and write the offset when writing the record into the
>> sink? (Probably need to wrap your sink function into a delegating sink
>> function)
>>
>> The way, Flink does the checkpointing is that it checkpoints the offset
>> of C, and the state of the window (containing B, C) to avoid data loss. Why
>> is that not working for you? Which state size do you expect?
>>
>> Why do you need the window operator at all? Couldn't you just
>> backpressure on the async I/O by delaying the processing there? Then there
>> would be no need to change anything.
>>
>> On Tue, Apr 13, 2021 at 10:00 AM Roman Khachatryan <ro...@apache.org>
>> wrote:
>>
>>> Hi Rahul,
>>>
>>> Right. There are no workarounds as far as I know.
>>>
>>> Regards,
>>> Roman
>>>
>>> On Mon, Apr 12, 2021 at 9:00 PM Rahul Patwari
>>> <ra...@gmail.com> wrote:
>>> >
>>> > Hi Roman, Arvid,
>>> >
>>> > So, to achieve "at least once" guarantee, currently, automatic restart
>>> of Flink should be disabled?
>>> > Is there any workaround to get "at least once" semantics with Flink
>>> Automatic restarts in this case?
>>> >
>>> > Regards,
>>> > Rahul
>>> >
>>> > On Mon, Apr 12, 2021 at 9:57 PM Roman Khachatryan <ro...@apache.org>
>>> wrote:
>>> >>
>>> >> Hi,
>>> >>
>>> >> Thanks for the clarification.
>>> >>
>>> >> > Other than managing offsets externally, Are there any other ways to
>>> guarantee "at least once" processing without enabling checkpointing?
>>> >>
>>> >> That's currently not possible, at least with the default connector.
>>> >>
>>> >> Regards,
>>> >> Roman
>>> >>
>>> >> On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari
>>> >> <ra...@gmail.com> wrote:
>>> >> >
>>> >> > Hi Roman,
>>> >> >
>>> >> > Thanks for the reply.
>>> >> > This is what I meant by Internal restarts - Automatic restore of
>>> Flink Job from a failure. For example, pipeline restarts when Fixed delay
>>> or Failure Rate restart strategies are configured.
>>> >> >
>>> >> > Quoting documentation in this link - Configuring Kafka Consumer
>>> start position configuration
>>> >> >
>>> >> >> Note that these start position configuration methods do not affect
>>> the start position when the job is automatically restored from a failure
>>> >> >
>>> >> >
>>> >> >
>>> >> > It seems that there will be data loss even when offsets are managed
>>> externally when there are pipeline restarts due to a failure, say, an
>>> exception. On the other hand, when the pipeline is stopped and
>>> resubmitted(say, an upgrade), there won't be any data loss as offsets are
>>> retrieved from an external store and configured while starting Kafka
>>> Consumer.
>>> >> >
>>> >> > We do not want to enable checkpointing as the pipeline is
>>> stateless. We have Deduplication logic in the pipeline and the processing
>>> is idempotent.
>>> >> >
>>> >> > Other than managing offsets externally, Are there any other ways to
>>> guarantee "at least once" processing without enabling checkpointing?
>>> >> >
>>> >> > Thanks,
>>> >> > Rahul
>>> >> >
>>> >> > On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan <ro...@apache.org>
>>> wrote:
>>> >> >>
>>> >> >> Hi,
>>> >> >>
>>> >> >> Could you please explain what you mean by internal restarts?
>>> >> >>
>>> >> >> If you commit offsets or timestamps from sink after emitting
>>> records
>>> >> >> to the external system then there should be no data loss.
>>> >> >> Otherwise (if you commit offsets earlier), you have to persist
>>> >> >> in-flight records to avoid data loss (i.e. enable checkpointing).
>>> >> >>
>>> >> >> Regards,
>>> >> >> Roman
>>> >> >>
>>> >> >> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
>>> >> >> <ra...@gmail.com> wrote:
>>> >> >> >
>>> >> >> > Hello,
>>> >> >> >
>>> >> >> > Context:
>>> >> >> >
>>> >> >> > We have a stateless Flink Pipeline which reads from Kafka topics.
>>> >> >> > The pipeline has a Windowing operator(Used only for introducing
>>> a delay in processing records) and AsyncI/O operators (used for
>>> Lookup/Enrichment).
>>> >> >> >
>>> >> >> > "At least Once" Processing semantics is needed for the pipeline
>>> to avoid data loss.
>>> >> >> >
>>> >> >> > Checkpointing is disabled and we are dependent on the auto
>>> offset commit of Kafka consumer for fault tolerance currently.
>>> >> >> >
>>> >> >> > As auto offset commit indicates that "the record is successfully
>>> read", instead of "the record is successfully processed", there will be
>>> data loss if there is a restart when the offset is committed to Kafka but
>>> not successfully processed by the Flink Pipeline, as the record is NOT
>>> replayed again when the pipeline is restarted.
>>> >> >> >
>>> >> >> > Checkpointing can solve this problem. But, since the pipeline is
>>> stateless, we do not want to use checkpointing, which will persist all the
>>> records in Windowing Operator and in-flight Async I/O calls.
>>> >> >> >
>>> >> >> > Question:
>>> >> >> >
>>> >> >> > We are looking for other ways to guarantee "at least once"
>>> processing without checkpointing. One such way is to manage Kafka Offsets
>>> Externally.
>>> >> >> >
>>> >> >> > We can maintain offsets of each partition of each topic in
>>> Cassandra(or maintain timestamp, where all records with timestamps less
>>> than this timestamp are successfully processed) and configure Kafka
>>> consumer Start Position - setStartFromTimestamp() or
>>> setStartFromSpecificOffsets()
>>> >> >> >
>>> >> >> > This will be helpful if the pipeline is manually restarted (say,
>>> JobManager pod is restarted). But, how to avoid data loss in case of
>>> internal restarts?
>>> >> >> >
>>> >> >> > Has anyone used this approach?
>>> >> >> > What are other ways to guarantee "at least once" processing
>>> without checkpointing for a stateless Flink pipeline?
>>> >> >> >
>>> >> >> > Thanks,
>>> >> >> > Rahul
>>>
>>

Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

Posted by Rahul Patwari <ra...@gmail.com>.
Hi Arvid,

Thanks for the reply.

could you please help me to understand how the at least once guarantee
> would work without checkpointing in your case?
>

This was the plan to maintain "at least once" guarantee:
Logic at Sink:
The DataStream on which Sink Function is applied, on the same DataStream,
apply a widowing operator and compute min of the timestamps of records and
persist the timestamp in Cassandra -> This will persist the record
timestamp below which all the records are processed, say, every 10 seconds.
(The delay created by the Windowing Operator used here makes sure that the
timestamp is persisted in Cassandra only after it is written to Sink)
Note: There is another Windowing Operator at the Source to delay the
processing of records.

Logic at Source:
While creating the JobGraph, read the timestamp persisted in Cassandra for
each topic and configure the start position of Kafka Consumer.

The problem is that the start positions are not respected when there are
Automatic restarts during failures. Basically, we wanted to read the
timestamp from Cassandra and start consuming from the timestamp even in
case of Automatic restarts during failures.

This pipeline should process millions of records per day with low latency.
I am avoiding Checkpointing, as the records in the Window operator and
in-flight records in the Async I/O operator are persisted along with the
Kafka offsets. But the records in Window and Async I/O operators can be
obtained just by replaying the records from Kafka Source, which is the
approach I want to take.
There is Deduplication logic in the pipeline. So, I am preferring to replay
records in case of failures rather than storing the incremental snapshots.

Thanks,
Rahul

On Tue, Apr 13, 2021 at 2:53 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Rahul,
>
> could you please help me to understand how the at least once guarantee
> would work without checkpointing in your case?
>
> Let's say you read records A, B, C. You use a window to delay processing,
> so let's say A passes and B, C are still in the window for the trigger.
>
> Now do you want to auto commit the offset of A after it being written in
> the sink? If so, what's keeping you from attaching the offset of the Kafka
> records to A, B, C and write the offset when writing the record into the
> sink? (Probably need to wrap your sink function into a delegating sink
> function)
>
> The way, Flink does the checkpointing is that it checkpoints the offset of
> C, and the state of the window (containing B, C) to avoid data loss. Why is
> that not working for you? Which state size do you expect?
>
> Why do you need the window operator at all? Couldn't you just backpressure
> on the async I/O by delaying the processing there? Then there would be no
> need to change anything.
>
> On Tue, Apr 13, 2021 at 10:00 AM Roman Khachatryan <ro...@apache.org>
> wrote:
>
>> Hi Rahul,
>>
>> Right. There are no workarounds as far as I know.
>>
>> Regards,
>> Roman
>>
>> On Mon, Apr 12, 2021 at 9:00 PM Rahul Patwari
>> <ra...@gmail.com> wrote:
>> >
>> > Hi Roman, Arvid,
>> >
>> > So, to achieve "at least once" guarantee, currently, automatic restart
>> of Flink should be disabled?
>> > Is there any workaround to get "at least once" semantics with Flink
>> Automatic restarts in this case?
>> >
>> > Regards,
>> > Rahul
>> >
>> > On Mon, Apr 12, 2021 at 9:57 PM Roman Khachatryan <ro...@apache.org>
>> wrote:
>> >>
>> >> Hi,
>> >>
>> >> Thanks for the clarification.
>> >>
>> >> > Other than managing offsets externally, Are there any other ways to
>> guarantee "at least once" processing without enabling checkpointing?
>> >>
>> >> That's currently not possible, at least with the default connector.
>> >>
>> >> Regards,
>> >> Roman
>> >>
>> >> On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari
>> >> <ra...@gmail.com> wrote:
>> >> >
>> >> > Hi Roman,
>> >> >
>> >> > Thanks for the reply.
>> >> > This is what I meant by Internal restarts - Automatic restore of
>> Flink Job from a failure. For example, pipeline restarts when Fixed delay
>> or Failure Rate restart strategies are configured.
>> >> >
>> >> > Quoting documentation in this link - Configuring Kafka Consumer
>> start position configuration
>> >> >
>> >> >> Note that these start position configuration methods do not affect
>> the start position when the job is automatically restored from a failure
>> >> >
>> >> >
>> >> >
>> >> > It seems that there will be data loss even when offsets are managed
>> externally when there are pipeline restarts due to a failure, say, an
>> exception. On the other hand, when the pipeline is stopped and
>> resubmitted(say, an upgrade), there won't be any data loss as offsets are
>> retrieved from an external store and configured while starting Kafka
>> Consumer.
>> >> >
>> >> > We do not want to enable checkpointing as the pipeline is stateless.
>> We have Deduplication logic in the pipeline and the processing is
>> idempotent.
>> >> >
>> >> > Other than managing offsets externally, Are there any other ways to
>> guarantee "at least once" processing without enabling checkpointing?
>> >> >
>> >> > Thanks,
>> >> > Rahul
>> >> >
>> >> > On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan <ro...@apache.org>
>> wrote:
>> >> >>
>> >> >> Hi,
>> >> >>
>> >> >> Could you please explain what you mean by internal restarts?
>> >> >>
>> >> >> If you commit offsets or timestamps from sink after emitting records
>> >> >> to the external system then there should be no data loss.
>> >> >> Otherwise (if you commit offsets earlier), you have to persist
>> >> >> in-flight records to avoid data loss (i.e. enable checkpointing).
>> >> >>
>> >> >> Regards,
>> >> >> Roman
>> >> >>
>> >> >> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
>> >> >> <ra...@gmail.com> wrote:
>> >> >> >
>> >> >> > Hello,
>> >> >> >
>> >> >> > Context:
>> >> >> >
>> >> >> > We have a stateless Flink Pipeline which reads from Kafka topics.
>> >> >> > The pipeline has a Windowing operator(Used only for introducing a
>> delay in processing records) and AsyncI/O operators (used for
>> Lookup/Enrichment).
>> >> >> >
>> >> >> > "At least Once" Processing semantics is needed for the pipeline
>> to avoid data loss.
>> >> >> >
>> >> >> > Checkpointing is disabled and we are dependent on the auto offset
>> commit of Kafka consumer for fault tolerance currently.
>> >> >> >
>> >> >> > As auto offset commit indicates that "the record is successfully
>> read", instead of "the record is successfully processed", there will be
>> data loss if there is a restart when the offset is committed to Kafka but
>> not successfully processed by the Flink Pipeline, as the record is NOT
>> replayed again when the pipeline is restarted.
>> >> >> >
>> >> >> > Checkpointing can solve this problem. But, since the pipeline is
>> stateless, we do not want to use checkpointing, which will persist all the
>> records in Windowing Operator and in-flight Async I/O calls.
>> >> >> >
>> >> >> > Question:
>> >> >> >
>> >> >> > We are looking for other ways to guarantee "at least once"
>> processing without checkpointing. One such way is to manage Kafka Offsets
>> Externally.
>> >> >> >
>> >> >> > We can maintain offsets of each partition of each topic in
>> Cassandra(or maintain timestamp, where all records with timestamps less
>> than this timestamp are successfully processed) and configure Kafka
>> consumer Start Position - setStartFromTimestamp() or
>> setStartFromSpecificOffsets()
>> >> >> >
>> >> >> > This will be helpful if the pipeline is manually restarted (say,
>> JobManager pod is restarted). But, how to avoid data loss in case of
>> internal restarts?
>> >> >> >
>> >> >> > Has anyone used this approach?
>> >> >> > What are other ways to guarantee "at least once" processing
>> without checkpointing for a stateless Flink pipeline?
>> >> >> >
>> >> >> > Thanks,
>> >> >> > Rahul
>>
>

Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

Posted by Arvid Heise <ar...@apache.org>.
Hi Rahul,

could you please help me to understand how the at least once guarantee
would work without checkpointing in your case?

Let's say you read records A, B, C. You use a window to delay processing,
so let's say A passes and B, C are still in the window for the trigger.

Now do you want to auto commit the offset of A after it being written in
the sink? If so, what's keeping you from attaching the offset of the Kafka
records to A, B, C and write the offset when writing the record into the
sink? (Probably need to wrap your sink function into a delegating sink
function)

The way, Flink does the checkpointing is that it checkpoints the offset of
C, and the state of the window (containing B, C) to avoid data loss. Why is
that not working for you? Which state size do you expect?

Why do you need the window operator at all? Couldn't you just backpressure
on the async I/O by delaying the processing there? Then there would be no
need to change anything.

On Tue, Apr 13, 2021 at 10:00 AM Roman Khachatryan <ro...@apache.org> wrote:

> Hi Rahul,
>
> Right. There are no workarounds as far as I know.
>
> Regards,
> Roman
>
> On Mon, Apr 12, 2021 at 9:00 PM Rahul Patwari
> <ra...@gmail.com> wrote:
> >
> > Hi Roman, Arvid,
> >
> > So, to achieve "at least once" guarantee, currently, automatic restart
> of Flink should be disabled?
> > Is there any workaround to get "at least once" semantics with Flink
> Automatic restarts in this case?
> >
> > Regards,
> > Rahul
> >
> > On Mon, Apr 12, 2021 at 9:57 PM Roman Khachatryan <ro...@apache.org>
> wrote:
> >>
> >> Hi,
> >>
> >> Thanks for the clarification.
> >>
> >> > Other than managing offsets externally, Are there any other ways to
> guarantee "at least once" processing without enabling checkpointing?
> >>
> >> That's currently not possible, at least with the default connector.
> >>
> >> Regards,
> >> Roman
> >>
> >> On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari
> >> <ra...@gmail.com> wrote:
> >> >
> >> > Hi Roman,
> >> >
> >> > Thanks for the reply.
> >> > This is what I meant by Internal restarts - Automatic restore of
> Flink Job from a failure. For example, pipeline restarts when Fixed delay
> or Failure Rate restart strategies are configured.
> >> >
> >> > Quoting documentation in this link - Configuring Kafka Consumer start
> position configuration
> >> >
> >> >> Note that these start position configuration methods do not affect
> the start position when the job is automatically restored from a failure
> >> >
> >> >
> >> >
> >> > It seems that there will be data loss even when offsets are managed
> externally when there are pipeline restarts due to a failure, say, an
> exception. On the other hand, when the pipeline is stopped and
> resubmitted(say, an upgrade), there won't be any data loss as offsets are
> retrieved from an external store and configured while starting Kafka
> Consumer.
> >> >
> >> > We do not want to enable checkpointing as the pipeline is stateless.
> We have Deduplication logic in the pipeline and the processing is
> idempotent.
> >> >
> >> > Other than managing offsets externally, Are there any other ways to
> guarantee "at least once" processing without enabling checkpointing?
> >> >
> >> > Thanks,
> >> > Rahul
> >> >
> >> > On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan <ro...@apache.org>
> wrote:
> >> >>
> >> >> Hi,
> >> >>
> >> >> Could you please explain what you mean by internal restarts?
> >> >>
> >> >> If you commit offsets or timestamps from sink after emitting records
> >> >> to the external system then there should be no data loss.
> >> >> Otherwise (if you commit offsets earlier), you have to persist
> >> >> in-flight records to avoid data loss (i.e. enable checkpointing).
> >> >>
> >> >> Regards,
> >> >> Roman
> >> >>
> >> >> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
> >> >> <ra...@gmail.com> wrote:
> >> >> >
> >> >> > Hello,
> >> >> >
> >> >> > Context:
> >> >> >
> >> >> > We have a stateless Flink Pipeline which reads from Kafka topics.
> >> >> > The pipeline has a Windowing operator(Used only for introducing a
> delay in processing records) and AsyncI/O operators (used for
> Lookup/Enrichment).
> >> >> >
> >> >> > "At least Once" Processing semantics is needed for the pipeline to
> avoid data loss.
> >> >> >
> >> >> > Checkpointing is disabled and we are dependent on the auto offset
> commit of Kafka consumer for fault tolerance currently.
> >> >> >
> >> >> > As auto offset commit indicates that "the record is successfully
> read", instead of "the record is successfully processed", there will be
> data loss if there is a restart when the offset is committed to Kafka but
> not successfully processed by the Flink Pipeline, as the record is NOT
> replayed again when the pipeline is restarted.
> >> >> >
> >> >> > Checkpointing can solve this problem. But, since the pipeline is
> stateless, we do not want to use checkpointing, which will persist all the
> records in Windowing Operator and in-flight Async I/O calls.
> >> >> >
> >> >> > Question:
> >> >> >
> >> >> > We are looking for other ways to guarantee "at least once"
> processing without checkpointing. One such way is to manage Kafka Offsets
> Externally.
> >> >> >
> >> >> > We can maintain offsets of each partition of each topic in
> Cassandra(or maintain timestamp, where all records with timestamps less
> than this timestamp are successfully processed) and configure Kafka
> consumer Start Position - setStartFromTimestamp() or
> setStartFromSpecificOffsets()
> >> >> >
> >> >> > This will be helpful if the pipeline is manually restarted (say,
> JobManager pod is restarted). But, how to avoid data loss in case of
> internal restarts?
> >> >> >
> >> >> > Has anyone used this approach?
> >> >> > What are other ways to guarantee "at least once" processing
> without checkpointing for a stateless Flink pipeline?
> >> >> >
> >> >> > Thanks,
> >> >> > Rahul
>

Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

Posted by Roman Khachatryan <ro...@apache.org>.
Hi Rahul,

Right. There are no workarounds as far as I know.

Regards,
Roman

On Mon, Apr 12, 2021 at 9:00 PM Rahul Patwari
<ra...@gmail.com> wrote:
>
> Hi Roman, Arvid,
>
> So, to achieve "at least once" guarantee, currently, automatic restart of Flink should be disabled?
> Is there any workaround to get "at least once" semantics with Flink Automatic restarts in this case?
>
> Regards,
> Rahul
>
> On Mon, Apr 12, 2021 at 9:57 PM Roman Khachatryan <ro...@apache.org> wrote:
>>
>> Hi,
>>
>> Thanks for the clarification.
>>
>> > Other than managing offsets externally, Are there any other ways to guarantee "at least once" processing without enabling checkpointing?
>>
>> That's currently not possible, at least with the default connector.
>>
>> Regards,
>> Roman
>>
>> On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari
>> <ra...@gmail.com> wrote:
>> >
>> > Hi Roman,
>> >
>> > Thanks for the reply.
>> > This is what I meant by Internal restarts - Automatic restore of Flink Job from a failure. For example, pipeline restarts when Fixed delay or Failure Rate restart strategies are configured.
>> >
>> > Quoting documentation in this link - Configuring Kafka Consumer start position configuration
>> >
>> >> Note that these start position configuration methods do not affect the start position when the job is automatically restored from a failure
>> >
>> >
>> >
>> > It seems that there will be data loss even when offsets are managed externally when there are pipeline restarts due to a failure, say, an exception. On the other hand, when the pipeline is stopped and resubmitted(say, an upgrade), there won't be any data loss as offsets are retrieved from an external store and configured while starting Kafka Consumer.
>> >
>> > We do not want to enable checkpointing as the pipeline is stateless. We have Deduplication logic in the pipeline and the processing is idempotent.
>> >
>> > Other than managing offsets externally, Are there any other ways to guarantee "at least once" processing without enabling checkpointing?
>> >
>> > Thanks,
>> > Rahul
>> >
>> > On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan <ro...@apache.org> wrote:
>> >>
>> >> Hi,
>> >>
>> >> Could you please explain what you mean by internal restarts?
>> >>
>> >> If you commit offsets or timestamps from sink after emitting records
>> >> to the external system then there should be no data loss.
>> >> Otherwise (if you commit offsets earlier), you have to persist
>> >> in-flight records to avoid data loss (i.e. enable checkpointing).
>> >>
>> >> Regards,
>> >> Roman
>> >>
>> >> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
>> >> <ra...@gmail.com> wrote:
>> >> >
>> >> > Hello,
>> >> >
>> >> > Context:
>> >> >
>> >> > We have a stateless Flink Pipeline which reads from Kafka topics.
>> >> > The pipeline has a Windowing operator(Used only for introducing a delay in processing records) and AsyncI/O operators (used for Lookup/Enrichment).
>> >> >
>> >> > "At least Once" Processing semantics is needed for the pipeline to avoid data loss.
>> >> >
>> >> > Checkpointing is disabled and we are dependent on the auto offset commit of Kafka consumer for fault tolerance currently.
>> >> >
>> >> > As auto offset commit indicates that "the record is successfully read", instead of "the record is successfully processed", there will be data loss if there is a restart when the offset is committed to Kafka but not successfully processed by the Flink Pipeline, as the record is NOT replayed again when the pipeline is restarted.
>> >> >
>> >> > Checkpointing can solve this problem. But, since the pipeline is stateless, we do not want to use checkpointing, which will persist all the records in Windowing Operator and in-flight Async I/O calls.
>> >> >
>> >> > Question:
>> >> >
>> >> > We are looking for other ways to guarantee "at least once" processing without checkpointing. One such way is to manage Kafka Offsets Externally.
>> >> >
>> >> > We can maintain offsets of each partition of each topic in Cassandra(or maintain timestamp, where all records with timestamps less than this timestamp are successfully processed) and configure Kafka consumer Start Position - setStartFromTimestamp() or setStartFromSpecificOffsets()
>> >> >
>> >> > This will be helpful if the pipeline is manually restarted (say, JobManager pod is restarted). But, how to avoid data loss in case of internal restarts?
>> >> >
>> >> > Has anyone used this approach?
>> >> > What are other ways to guarantee "at least once" processing without checkpointing for a stateless Flink pipeline?
>> >> >
>> >> > Thanks,
>> >> > Rahul

Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

Posted by Rahul Patwari <ra...@gmail.com>.
Hi Roman, Arvid,

So, to achieve "at least once" guarantee, currently, automatic restart of
Flink should be disabled?
Is there any workaround to get "at least once" semantics with Flink
Automatic restarts in this case?

Regards,
Rahul

On Mon, Apr 12, 2021 at 9:57 PM Roman Khachatryan <ro...@apache.org> wrote:

> Hi,
>
> Thanks for the clarification.
>
> > Other than managing offsets externally, Are there any other ways to
> guarantee "at least once" processing without enabling checkpointing?
>
> That's currently not possible, at least with the default connector.
>
> Regards,
> Roman
>
> On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari
> <ra...@gmail.com> wrote:
> >
> > Hi Roman,
> >
> > Thanks for the reply.
> > This is what I meant by Internal restarts - Automatic restore of Flink
> Job from a failure. For example, pipeline restarts when Fixed delay or
> Failure Rate restart strategies are configured.
> >
> > Quoting documentation in this link - Configuring Kafka Consumer start
> position configuration
> >
> >> Note that these start position configuration methods do not affect the
> start position when the job is automatically restored from a failure
> >
> >
> >
> > It seems that there will be data loss even when offsets are managed
> externally when there are pipeline restarts due to a failure, say, an
> exception. On the other hand, when the pipeline is stopped and
> resubmitted(say, an upgrade), there won't be any data loss as offsets are
> retrieved from an external store and configured while starting Kafka
> Consumer.
> >
> > We do not want to enable checkpointing as the pipeline is stateless. We
> have Deduplication logic in the pipeline and the processing is idempotent.
> >
> > Other than managing offsets externally, Are there any other ways to
> guarantee "at least once" processing without enabling checkpointing?
> >
> > Thanks,
> > Rahul
> >
> > On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan <ro...@apache.org>
> wrote:
> >>
> >> Hi,
> >>
> >> Could you please explain what you mean by internal restarts?
> >>
> >> If you commit offsets or timestamps from sink after emitting records
> >> to the external system then there should be no data loss.
> >> Otherwise (if you commit offsets earlier), you have to persist
> >> in-flight records to avoid data loss (i.e. enable checkpointing).
> >>
> >> Regards,
> >> Roman
> >>
> >> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
> >> <ra...@gmail.com> wrote:
> >> >
> >> > Hello,
> >> >
> >> > Context:
> >> >
> >> > We have a stateless Flink Pipeline which reads from Kafka topics.
> >> > The pipeline has a Windowing operator(Used only for introducing a
> delay in processing records) and AsyncI/O operators (used for
> Lookup/Enrichment).
> >> >
> >> > "At least Once" Processing semantics is needed for the pipeline to
> avoid data loss.
> >> >
> >> > Checkpointing is disabled and we are dependent on the auto offset
> commit of Kafka consumer for fault tolerance currently.
> >> >
> >> > As auto offset commit indicates that "the record is successfully
> read", instead of "the record is successfully processed", there will be
> data loss if there is a restart when the offset is committed to Kafka but
> not successfully processed by the Flink Pipeline, as the record is NOT
> replayed again when the pipeline is restarted.
> >> >
> >> > Checkpointing can solve this problem. But, since the pipeline is
> stateless, we do not want to use checkpointing, which will persist all the
> records in Windowing Operator and in-flight Async I/O calls.
> >> >
> >> > Question:
> >> >
> >> > We are looking for other ways to guarantee "at least once" processing
> without checkpointing. One such way is to manage Kafka Offsets Externally.
> >> >
> >> > We can maintain offsets of each partition of each topic in
> Cassandra(or maintain timestamp, where all records with timestamps less
> than this timestamp are successfully processed) and configure Kafka
> consumer Start Position - setStartFromTimestamp() or
> setStartFromSpecificOffsets()
> >> >
> >> > This will be helpful if the pipeline is manually restarted (say,
> JobManager pod is restarted). But, how to avoid data loss in case of
> internal restarts?
> >> >
> >> > Has anyone used this approach?
> >> > What are other ways to guarantee "at least once" processing without
> checkpointing for a stateless Flink pipeline?
> >> >
> >> > Thanks,
> >> > Rahul
>

Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

Posted by Roman Khachatryan <ro...@apache.org>.
Hi,

Thanks for the clarification.

> Other than managing offsets externally, Are there any other ways to guarantee "at least once" processing without enabling checkpointing?

That's currently not possible, at least with the default connector.

Regards,
Roman

On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari
<ra...@gmail.com> wrote:
>
> Hi Roman,
>
> Thanks for the reply.
> This is what I meant by Internal restarts - Automatic restore of Flink Job from a failure. For example, pipeline restarts when Fixed delay or Failure Rate restart strategies are configured.
>
> Quoting documentation in this link - Configuring Kafka Consumer start position configuration
>
>> Note that these start position configuration methods do not affect the start position when the job is automatically restored from a failure
>
>
>
> It seems that there will be data loss even when offsets are managed externally when there are pipeline restarts due to a failure, say, an exception. On the other hand, when the pipeline is stopped and resubmitted(say, an upgrade), there won't be any data loss as offsets are retrieved from an external store and configured while starting Kafka Consumer.
>
> We do not want to enable checkpointing as the pipeline is stateless. We have Deduplication logic in the pipeline and the processing is idempotent.
>
> Other than managing offsets externally, Are there any other ways to guarantee "at least once" processing without enabling checkpointing?
>
> Thanks,
> Rahul
>
> On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan <ro...@apache.org> wrote:
>>
>> Hi,
>>
>> Could you please explain what you mean by internal restarts?
>>
>> If you commit offsets or timestamps from sink after emitting records
>> to the external system then there should be no data loss.
>> Otherwise (if you commit offsets earlier), you have to persist
>> in-flight records to avoid data loss (i.e. enable checkpointing).
>>
>> Regards,
>> Roman
>>
>> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
>> <ra...@gmail.com> wrote:
>> >
>> > Hello,
>> >
>> > Context:
>> >
>> > We have a stateless Flink Pipeline which reads from Kafka topics.
>> > The pipeline has a Windowing operator(Used only for introducing a delay in processing records) and AsyncI/O operators (used for Lookup/Enrichment).
>> >
>> > "At least Once" Processing semantics is needed for the pipeline to avoid data loss.
>> >
>> > Checkpointing is disabled and we are dependent on the auto offset commit of Kafka consumer for fault tolerance currently.
>> >
>> > As auto offset commit indicates that "the record is successfully read", instead of "the record is successfully processed", there will be data loss if there is a restart when the offset is committed to Kafka but not successfully processed by the Flink Pipeline, as the record is NOT replayed again when the pipeline is restarted.
>> >
>> > Checkpointing can solve this problem. But, since the pipeline is stateless, we do not want to use checkpointing, which will persist all the records in Windowing Operator and in-flight Async I/O calls.
>> >
>> > Question:
>> >
>> > We are looking for other ways to guarantee "at least once" processing without checkpointing. One such way is to manage Kafka Offsets Externally.
>> >
>> > We can maintain offsets of each partition of each topic in Cassandra(or maintain timestamp, where all records with timestamps less than this timestamp are successfully processed) and configure Kafka consumer Start Position - setStartFromTimestamp() or setStartFromSpecificOffsets()
>> >
>> > This will be helpful if the pipeline is manually restarted (say, JobManager pod is restarted). But, how to avoid data loss in case of internal restarts?
>> >
>> > Has anyone used this approach?
>> > What are other ways to guarantee "at least once" processing without checkpointing for a stateless Flink pipeline?
>> >
>> > Thanks,
>> > Rahul

Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

Posted by Rahul Patwari <ra...@gmail.com>.
Hi Roman,

Thanks for the reply.
This is what I meant by Internal restarts - Automatic restore of Flink Job
from a failure. For example, pipeline restarts when Fixed delay
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html#fixed-delay-restart-strategy>
or Failure Rate
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html#failure-rate-restart-strategy>
restart strategies are configured.

Quoting documentation in this link - Configuring Kafka Consumer start
position configuration
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration>


Note that these start position configuration methods do not affect the
> start position when the job is automatically restored from a failure



It seems that there will be data loss even when offsets are managed
externally when there are pipeline restarts due to a failure, say, an
exception. On the other hand, when the pipeline is stopped and
resubmitted(say, an upgrade), there won't be any data loss as offsets are
retrieved from an external store and configured while starting Kafka
Consumer.

We do not want to enable checkpointing as the pipeline is stateless. We
have Deduplication logic in the pipeline and the processing is idempotent.

Other than managing offsets externally, Are there any other ways to
guarantee "at least once" processing without enabling checkpointing?

Thanks,
Rahul

On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan <ro...@apache.org> wrote:

> Hi,
>
> Could you please explain what you mean by internal restarts?
>
> If you commit offsets or timestamps from sink after emitting records
> to the external system then there should be no data loss.
> Otherwise (if you commit offsets earlier), you have to persist
> in-flight records to avoid data loss (i.e. enable checkpointing).
>
> Regards,
> Roman
>
> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
> <ra...@gmail.com> wrote:
> >
> > Hello,
> >
> > Context:
> >
> > We have a stateless Flink Pipeline which reads from Kafka topics.
> > The pipeline has a Windowing operator(Used only for introducing a delay
> in processing records) and AsyncI/O operators (used for Lookup/Enrichment).
> >
> > "At least Once" Processing semantics is needed for the pipeline to avoid
> data loss.
> >
> > Checkpointing is disabled and we are dependent on the auto offset commit
> of Kafka consumer for fault tolerance currently.
> >
> > As auto offset commit indicates that "the record is successfully read",
> instead of "the record is successfully processed", there will be data loss
> if there is a restart when the offset is committed to Kafka but not
> successfully processed by the Flink Pipeline, as the record is NOT replayed
> again when the pipeline is restarted.
> >
> > Checkpointing can solve this problem. But, since the pipeline is
> stateless, we do not want to use checkpointing, which will persist all the
> records in Windowing Operator and in-flight Async I/O calls.
> >
> > Question:
> >
> > We are looking for other ways to guarantee "at least once" processing
> without checkpointing. One such way is to manage Kafka Offsets Externally.
> >
> > We can maintain offsets of each partition of each topic in Cassandra(or
> maintain timestamp, where all records with timestamps less than this
> timestamp are successfully processed) and configure Kafka consumer Start
> Position - setStartFromTimestamp() or setStartFromSpecificOffsets()
> >
> > This will be helpful if the pipeline is manually restarted (say,
> JobManager pod is restarted). But, how to avoid data loss in case of
> internal restarts?
> >
> > Has anyone used this approach?
> > What are other ways to guarantee "at least once" processing without
> checkpointing for a stateless Flink pipeline?
> >
> > Thanks,
> > Rahul
>

Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

Posted by Roman Khachatryan <ro...@apache.org>.
Hi,

Could you please explain what you mean by internal restarts?

If you commit offsets or timestamps from sink after emitting records
to the external system then there should be no data loss.
Otherwise (if you commit offsets earlier), you have to persist
in-flight records to avoid data loss (i.e. enable checkpointing).

Regards,
Roman

On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
<ra...@gmail.com> wrote:
>
> Hello,
>
> Context:
>
> We have a stateless Flink Pipeline which reads from Kafka topics.
> The pipeline has a Windowing operator(Used only for introducing a delay in processing records) and AsyncI/O operators (used for Lookup/Enrichment).
>
> "At least Once" Processing semantics is needed for the pipeline to avoid data loss.
>
> Checkpointing is disabled and we are dependent on the auto offset commit of Kafka consumer for fault tolerance currently.
>
> As auto offset commit indicates that "the record is successfully read", instead of "the record is successfully processed", there will be data loss if there is a restart when the offset is committed to Kafka but not successfully processed by the Flink Pipeline, as the record is NOT replayed again when the pipeline is restarted.
>
> Checkpointing can solve this problem. But, since the pipeline is stateless, we do not want to use checkpointing, which will persist all the records in Windowing Operator and in-flight Async I/O calls.
>
> Question:
>
> We are looking for other ways to guarantee "at least once" processing without checkpointing. One such way is to manage Kafka Offsets Externally.
>
> We can maintain offsets of each partition of each topic in Cassandra(or maintain timestamp, where all records with timestamps less than this timestamp are successfully processed) and configure Kafka consumer Start Position - setStartFromTimestamp() or setStartFromSpecificOffsets()
>
> This will be helpful if the pipeline is manually restarted (say, JobManager pod is restarted). But, how to avoid data loss in case of internal restarts?
>
> Has anyone used this approach?
> What are other ways to guarantee "at least once" processing without checkpointing for a stateless Flink pipeline?
>
> Thanks,
> Rahul