You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Pushkar Deole <pd...@gmail.com> on 2021/10/04 14:31:37 UTC

kafka streams commit.interval.ms for at-least-once too high

Hi All,

I am looking into the commit.interval.ms in kafka streams which says that
it is the time interval at which streams would commit offsets to source
topics.
However for exactly-once guarantee, default of this time is 100ms whereas
for at-least-once it is 30000ms (i.e. 30sec)
Why is there such a huge time difference between the 2 guarantees and what
does it mean to have this interval as high as 30 seconds, does it also
cause more probability of higher no. of duplicates in case of application
restarts or partition rebalance ?
Does it mean that the streams application would also publish events to
destination topic only at this interval which means delay in publishing
events to destinations topic ?

Re: kafka streams commit.interval.ms for at-least-once too high

Posted by "Matthias J. Sax" <mj...@apache.org>.
If you build it manually / from-scratch using plain consumer/producer, 
it is your responsibility to avoid duplicates and/or data loss for a 
clean shutdown case or a rebalance.

That is, why we recommend to use Kafka Streams for a 
consumer-process-produce pattern, as it does a lot of heavy lifting for 
you out-of-the-box and should avoid bugs (as it's battle tested for many 
years).


-Matthias

On 10/6/21 2:38 AM, Pushkar Deole wrote:
> Matthias,
> 
> Good to hear on this part that kafka streams handle this internally :  "If
> a rebalance/shutdown is triggered, Kafka Streams will stop processing new
> records and just finish processing all in-flight records. Afterwards, a
> commit happens right away for all fully processed records."
> 
> Since regular consumer-producer doesn't support this I guess, even in case
> of normal shutdown there could be duplicates. Is that correct or kafka has
> support for normal consumer-producer to handle in-flight processing and
> commit those offsets before a rebalance/shutdown occurs?
> 
> On Wed, Oct 6, 2021 at 12:22 AM Matthias J. Sax <mj...@apache.org> wrote:
> 
>>> - By producer config, i hope you mean batching and other settings that
>> will
>>> hold off producing of events. Correct me if i'm wrong
>>
>> Correct.
>>
>>> - Not sure what you mean by throughput here, which configuration would
>>> dictate that?
>>
>> I referred to input topic throughput. If you have higher/lower
>> throughput you might get data quicker/later depending on your producer
>> configs.
>>
>>> - Do you mean here that the kafka streams internally handles waiting on
>>> processing and offset commits of events that are already consumed and
>> being
>>> processed for streams instance?
>>
>> If a rebalance/shutdown is triggered, Kafka Streams will stop processing
>> new records and just finish processing all in-flight records.
>> Afterwards, a commit happens right away for all fully processed records.
>>
>>
>> -Matthias
>>
>>
>> On 10/5/21 8:35 AM, Pushkar Deole wrote:
>>> Matthias,
>>>
>>> On your response "For at-least-once, you would still get output
>>> continuously, depending on throughput and producer configs"
>>> - Not sure what you mean by throughput here, which configuration would
>>> dictate that?
>>> - By producer config, i hope you mean batching and other settings that
>> will
>>> hold off producing of events. Correct me if i'm wrong
>>>
>>> On your response "For regular rebalances/restarts, a longer commit
>> interval
>>> has no impact because offsets would be committed right away"
>>> - Do you mean here that the kafka streams internally handles waiting on
>>> processing and offset commits of events that are already consumed and
>> being
>>> processed for streams instance?
>>>
>>> On Tue, Oct 5, 2021 at 11:43 AM Matthias J. Sax <mj...@apache.org>
>> wrote:
>>>
>>>> The main motivation for a shorter commit interval for EOS is
>>>> end-to-end-latency. A Topology could consist of multiple sub-topologies
>>>> and the end-to-end-latency for the EOS case is roughly commit-interval
>>>> times number-of-subtopologies.
>>>>
>>>> For regular rebalances/restarts, a longer commit interval has no impact,
>>>> because for a regular rebalance/restart, offsets would be committed
>>>> right away to guarantee a clean hand-off. Only in case of failure, a
>>>> longer commit interval can lead to larger amount of duplicates (of
>>>> course only for at-least-once guarantees).
>>>>
>>>> For at-least-once, you would still get output continuously, depending on
>>>> throughput and producer configs. Only offsets are committed each 30
>>>> seconds by default. This continuous output is also the reason why there
>>>> is not latency impact for at-least-once using a longer commit interval.
>>>>
>>>> Beside an impact on latency, there is also a throughput impact. Using a
>>>> longer commit interval provides higher throughput.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 10/4/21 7:31 AM, Pushkar Deole wrote:
>>>>> Hi All,
>>>>>
>>>>> I am looking into the commit.interval.ms in kafka streams which says
>>>> that
>>>>> it is the time interval at which streams would commit offsets to source
>>>>> topics.
>>>>> However for exactly-once guarantee, default of this time is 100ms
>> whereas
>>>>> for at-least-once it is 30000ms (i.e. 30sec)
>>>>> Why is there such a huge time difference between the 2 guarantees and
>>>> what
>>>>> does it mean to have this interval as high as 30 seconds, does it also
>>>>> cause more probability of higher no. of duplicates in case of
>> application
>>>>> restarts or partition rebalance ?
>>>>> Does it mean that the streams application would also publish events to
>>>>> destination topic only at this interval which means delay in publishing
>>>>> events to destinations topic ?
>>>>>
>>>>
>>>
>>
> 

Re: kafka streams commit.interval.ms for at-least-once too high

Posted by Pushkar Deole <pd...@gmail.com>.
Matthias,

Good to hear on this part that kafka streams handle this internally :  "If
a rebalance/shutdown is triggered, Kafka Streams will stop processing new
records and just finish processing all in-flight records. Afterwards, a
commit happens right away for all fully processed records."

Since regular consumer-producer doesn't support this I guess, even in case
of normal shutdown there could be duplicates. Is that correct or kafka has
support for normal consumer-producer to handle in-flight processing and
commit those offsets before a rebalance/shutdown occurs?

On Wed, Oct 6, 2021 at 12:22 AM Matthias J. Sax <mj...@apache.org> wrote:

> > - By producer config, i hope you mean batching and other settings that
> will
> > hold off producing of events. Correct me if i'm wrong
>
> Correct.
>
> > - Not sure what you mean by throughput here, which configuration would
> > dictate that?
>
> I referred to input topic throughput. If you have higher/lower
> throughput you might get data quicker/later depending on your producer
> configs.
>
> > - Do you mean here that the kafka streams internally handles waiting on
> > processing and offset commits of events that are already consumed and
> being
> > processed for streams instance?
>
> If a rebalance/shutdown is triggered, Kafka Streams will stop processing
> new records and just finish processing all in-flight records.
> Afterwards, a commit happens right away for all fully processed records.
>
>
> -Matthias
>
>
> On 10/5/21 8:35 AM, Pushkar Deole wrote:
> > Matthias,
> >
> > On your response "For at-least-once, you would still get output
> > continuously, depending on throughput and producer configs"
> > - Not sure what you mean by throughput here, which configuration would
> > dictate that?
> > - By producer config, i hope you mean batching and other settings that
> will
> > hold off producing of events. Correct me if i'm wrong
> >
> > On your response "For regular rebalances/restarts, a longer commit
> interval
> > has no impact because offsets would be committed right away"
> > - Do you mean here that the kafka streams internally handles waiting on
> > processing and offset commits of events that are already consumed and
> being
> > processed for streams instance?
> >
> > On Tue, Oct 5, 2021 at 11:43 AM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> The main motivation for a shorter commit interval for EOS is
> >> end-to-end-latency. A Topology could consist of multiple sub-topologies
> >> and the end-to-end-latency for the EOS case is roughly commit-interval
> >> times number-of-subtopologies.
> >>
> >> For regular rebalances/restarts, a longer commit interval has no impact,
> >> because for a regular rebalance/restart, offsets would be committed
> >> right away to guarantee a clean hand-off. Only in case of failure, a
> >> longer commit interval can lead to larger amount of duplicates (of
> >> course only for at-least-once guarantees).
> >>
> >> For at-least-once, you would still get output continuously, depending on
> >> throughput and producer configs. Only offsets are committed each 30
> >> seconds by default. This continuous output is also the reason why there
> >> is not latency impact for at-least-once using a longer commit interval.
> >>
> >> Beside an impact on latency, there is also a throughput impact. Using a
> >> longer commit interval provides higher throughput.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 10/4/21 7:31 AM, Pushkar Deole wrote:
> >>> Hi All,
> >>>
> >>> I am looking into the commit.interval.ms in kafka streams which says
> >> that
> >>> it is the time interval at which streams would commit offsets to source
> >>> topics.
> >>> However for exactly-once guarantee, default of this time is 100ms
> whereas
> >>> for at-least-once it is 30000ms (i.e. 30sec)
> >>> Why is there such a huge time difference between the 2 guarantees and
> >> what
> >>> does it mean to have this interval as high as 30 seconds, does it also
> >>> cause more probability of higher no. of duplicates in case of
> application
> >>> restarts or partition rebalance ?
> >>> Does it mean that the streams application would also publish events to
> >>> destination topic only at this interval which means delay in publishing
> >>> events to destinations topic ?
> >>>
> >>
> >
>

Re: kafka streams commit.interval.ms for at-least-once too high

Posted by "Matthias J. Sax" <mj...@apache.org>.
> - By producer config, i hope you mean batching and other settings that will
> hold off producing of events. Correct me if i'm wrong

Correct.

> - Not sure what you mean by throughput here, which configuration would
> dictate that?

I referred to input topic throughput. If you have higher/lower 
throughput you might get data quicker/later depending on your producer 
configs.

> - Do you mean here that the kafka streams internally handles waiting on
> processing and offset commits of events that are already consumed and being
> processed for streams instance?

If a rebalance/shutdown is triggered, Kafka Streams will stop processing 
new records and just finish processing all in-flight records. 
Afterwards, a commit happens right away for all fully processed records.


-Matthias


On 10/5/21 8:35 AM, Pushkar Deole wrote:
> Matthias,
> 
> On your response "For at-least-once, you would still get output
> continuously, depending on throughput and producer configs"
> - Not sure what you mean by throughput here, which configuration would
> dictate that?
> - By producer config, i hope you mean batching and other settings that will
> hold off producing of events. Correct me if i'm wrong
> 
> On your response "For regular rebalances/restarts, a longer commit interval
> has no impact because offsets would be committed right away"
> - Do you mean here that the kafka streams internally handles waiting on
> processing and offset commits of events that are already consumed and being
> processed for streams instance?
> 
> On Tue, Oct 5, 2021 at 11:43 AM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> The main motivation for a shorter commit interval for EOS is
>> end-to-end-latency. A Topology could consist of multiple sub-topologies
>> and the end-to-end-latency for the EOS case is roughly commit-interval
>> times number-of-subtopologies.
>>
>> For regular rebalances/restarts, a longer commit interval has no impact,
>> because for a regular rebalance/restart, offsets would be committed
>> right away to guarantee a clean hand-off. Only in case of failure, a
>> longer commit interval can lead to larger amount of duplicates (of
>> course only for at-least-once guarantees).
>>
>> For at-least-once, you would still get output continuously, depending on
>> throughput and producer configs. Only offsets are committed each 30
>> seconds by default. This continuous output is also the reason why there
>> is not latency impact for at-least-once using a longer commit interval.
>>
>> Beside an impact on latency, there is also a throughput impact. Using a
>> longer commit interval provides higher throughput.
>>
>>
>> -Matthias
>>
>>
>> On 10/4/21 7:31 AM, Pushkar Deole wrote:
>>> Hi All,
>>>
>>> I am looking into the commit.interval.ms in kafka streams which says
>> that
>>> it is the time interval at which streams would commit offsets to source
>>> topics.
>>> However for exactly-once guarantee, default of this time is 100ms whereas
>>> for at-least-once it is 30000ms (i.e. 30sec)
>>> Why is there such a huge time difference between the 2 guarantees and
>> what
>>> does it mean to have this interval as high as 30 seconds, does it also
>>> cause more probability of higher no. of duplicates in case of application
>>> restarts or partition rebalance ?
>>> Does it mean that the streams application would also publish events to
>>> destination topic only at this interval which means delay in publishing
>>> events to destinations topic ?
>>>
>>
> 

Re: kafka streams commit.interval.ms for at-least-once too high

Posted by Pushkar Deole <pd...@gmail.com>.
Matthias,

On your response "For at-least-once, you would still get output
continuously, depending on throughput and producer configs"
- Not sure what you mean by throughput here, which configuration would
dictate that?
- By producer config, i hope you mean batching and other settings that will
hold off producing of events. Correct me if i'm wrong

On your response "For regular rebalances/restarts, a longer commit interval
has no impact because offsets would be committed right away"
- Do you mean here that the kafka streams internally handles waiting on
processing and offset commits of events that are already consumed and being
processed for streams instance?

On Tue, Oct 5, 2021 at 11:43 AM Matthias J. Sax <mj...@apache.org> wrote:

> The main motivation for a shorter commit interval for EOS is
> end-to-end-latency. A Topology could consist of multiple sub-topologies
> and the end-to-end-latency for the EOS case is roughly commit-interval
> times number-of-subtopologies.
>
> For regular rebalances/restarts, a longer commit interval has no impact,
> because for a regular rebalance/restart, offsets would be committed
> right away to guarantee a clean hand-off. Only in case of failure, a
> longer commit interval can lead to larger amount of duplicates (of
> course only for at-least-once guarantees).
>
> For at-least-once, you would still get output continuously, depending on
> throughput and producer configs. Only offsets are committed each 30
> seconds by default. This continuous output is also the reason why there
> is not latency impact for at-least-once using a longer commit interval.
>
> Beside an impact on latency, there is also a throughput impact. Using a
> longer commit interval provides higher throughput.
>
>
> -Matthias
>
>
> On 10/4/21 7:31 AM, Pushkar Deole wrote:
> > Hi All,
> >
> > I am looking into the commit.interval.ms in kafka streams which says
> that
> > it is the time interval at which streams would commit offsets to source
> > topics.
> > However for exactly-once guarantee, default of this time is 100ms whereas
> > for at-least-once it is 30000ms (i.e. 30sec)
> > Why is there such a huge time difference between the 2 guarantees and
> what
> > does it mean to have this interval as high as 30 seconds, does it also
> > cause more probability of higher no. of duplicates in case of application
> > restarts or partition rebalance ?
> > Does it mean that the streams application would also publish events to
> > destination topic only at this interval which means delay in publishing
> > events to destinations topic ?
> >
>

Re: kafka streams commit.interval.ms for at-least-once too high

Posted by "Matthias J. Sax" <mj...@apache.org>.
The main motivation for a shorter commit interval for EOS is 
end-to-end-latency. A Topology could consist of multiple sub-topologies 
and the end-to-end-latency for the EOS case is roughly commit-interval 
times number-of-subtopologies.

For regular rebalances/restarts, a longer commit interval has no impact, 
because for a regular rebalance/restart, offsets would be committed 
right away to guarantee a clean hand-off. Only in case of failure, a 
longer commit interval can lead to larger amount of duplicates (of 
course only for at-least-once guarantees).

For at-least-once, you would still get output continuously, depending on 
throughput and producer configs. Only offsets are committed each 30 
seconds by default. This continuous output is also the reason why there 
is not latency impact for at-least-once using a longer commit interval.

Beside an impact on latency, there is also a throughput impact. Using a 
longer commit interval provides higher throughput.


-Matthias


On 10/4/21 7:31 AM, Pushkar Deole wrote:
> Hi All,
> 
> I am looking into the commit.interval.ms in kafka streams which says that
> it is the time interval at which streams would commit offsets to source
> topics.
> However for exactly-once guarantee, default of this time is 100ms whereas
> for at-least-once it is 30000ms (i.e. 30sec)
> Why is there such a huge time difference between the 2 guarantees and what
> does it mean to have this interval as high as 30 seconds, does it also
> cause more probability of higher no. of duplicates in case of application
> restarts or partition rebalance ?
> Does it mean that the streams application would also publish events to
> destination topic only at this interval which means delay in publishing
> events to destinations topic ?
>