You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Dmitry Minkovsky <dm...@gmail.com> on 2017/07/20 22:03:21 UTC

Kafka Streams: why aren't offsets being committed?

My Streams application is configured to commit offsets every 250ms:

        Properties streamsConfig = new Properties();
        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 250);


However, every time I restart my application, records that have already
been processed are re-processed, even if the application has not had data
for a long time.

My guess is that offsets are committed only when all tasks in the topology
have received input. Is this what's happening?



Thank you,
Dmitry

Re: Kafka Streams: why aren't offsets being committed?

Posted by Garrett Barton <ga...@gmail.com>.
Bingo!  Thanks Dmitry, that is exactly what I'm running into.  Looks like I
just have to set offset.retention.minutes to be greater than my largest
log.retention.hours topics to be safe.

On Mon, Aug 7, 2017 at 9:03 PM, Dmitry Minkovsky <dm...@gmail.com>
wrote:

> Hi Garrett,
>
> This one
> https://issues.apache.org/jira/plugins/servlet/mobile#issue/KAFKA-5510
>
> Best,
> Dmitry
>
> пн, 7 авг. 2017 г. в 14:22, Garrett Barton <ga...@gmail.com>:
>
> > Dmitry, which KIP are you referring to? I see this behavior too
> sometimes.
> >
> > On Fri, Aug 4, 2017 at 10:25 AM, Dmitry Minkovsky <dm...@gmail.com>
> > wrote:
> >
> > > Thank you Matthias and Bill,
> > >
> > > Just want to confirm that was my offsets *were *being committed but I
> was
> > > being affected by `offsets.retention.minutes` which I did not know
> > about. I
> > > set
> > >
> > > offsets.retention.minutes=2147483647
> > > offsets.retention.check.interval.ms=9223372036854775807
> > >
> > > Will keep an eye on that KIP.
> > >
> > > Best,
> > > Dmitry
> > >
> > > On Fri, Jul 21, 2017 at 4:31 AM, Matthias J. Sax <
> matthias@confluent.io>
> > > wrote:
> > >
> > > > >>>> My guess is that offsets are committed only when all tasks in
> the
> > > > >>> topology
> > > > >>>> have received input. Is this what's happening?
> > > >
> > > > No. Task offsets are committed independently from each other.
> > > >
> > > > You can you double check the logs in DEBUG mode. It indicates when
> > > > offsets get committed. Also check via `bin/kafka-consumer-groups.sh`
> > > > what offsets are committed (application.id == group.id)
> > > >
> > > > Hope this helps.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 7/21/17 2:27 AM, Dmitry Minkovsky wrote:
> > > > > Hi Bill,
> > > > >
> > > > >> When you say "even if the application has not had data for a long
> > > time"
> > > > do
> > > > > you have a rough idea of how long?
> > > > >
> > > > > Minutes, hours
> > > > >
> > > > >> What is the value of  your
> > > > > "auto.offset.reset"  configuration?
> > > > >
> > > > > I don't specify it explicitly, but the ConsumerConfig logs indicate
> > > > > "auto.offset.reset = earliest" for all consumers the application
> > > creates.
> > > > >
> > > > > Thank you,
> > > > > Dmitry
> > > > >
> > > > >
> > > > > On Thu, Jul 20, 2017 at 8:07 PM, Bill Bejeck <bi...@confluent.io>
> > > wrote:
> > > > >
> > > > >> Hi Dmitry,
> > > > >>
> > > > >> When you say "even if the application has not had data for a long
> > > time"
> > > > do
> > > > >> you have a rough idea of how long?  What is the value of  your
> > > > >> "auto.offset.reset"  configuration?
> > > > >>
> > > > >> Thanks,
> > > > >> Bill
> > > > >>
> > > > >> On Thu, Jul 20, 2017 at 6:03 PM, Dmitry Minkovsky <
> > > dminkovsky@gmail.com
> > > > >
> > > > >> wrote:
> > > > >>
> > > > >>> My Streams application is configured to commit offsets every
> 250ms:
> > > > >>>
> > > > >>>         Properties streamsConfig = new Properties();
> > > > >>>         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_
> MS_CONFIG,
> > > > 250);
> > > > >>>
> > > > >>>
> > > > >>> However, every time I restart my application, records that have
> > > already
> > > > >>> been processed are re-processed, even if the application has not
> > had
> > > > data
> > > > >>> for a long time.
> > > > >>>
> > > > >>> My guess is that offsets are committed only when all tasks in the
> > > > >> topology
> > > > >>> have received input. Is this what's happening?
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> Thank you,
> > > > >>> Dmitry
> > > > >>>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
>

Re: Kafka Streams: why aren't offsets being committed?

Posted by Dmitry Minkovsky <dm...@gmail.com>.
Hi Garrett,

This one
https://issues.apache.org/jira/plugins/servlet/mobile#issue/KAFKA-5510

Best,
Dmitry

пн, 7 авг. 2017 г. в 14:22, Garrett Barton <ga...@gmail.com>:

> Dmitry, which KIP are you referring to? I see this behavior too sometimes.
>
> On Fri, Aug 4, 2017 at 10:25 AM, Dmitry Minkovsky <dm...@gmail.com>
> wrote:
>
> > Thank you Matthias and Bill,
> >
> > Just want to confirm that was my offsets *were *being committed but I was
> > being affected by `offsets.retention.minutes` which I did not know
> about. I
> > set
> >
> > offsets.retention.minutes=2147483647
> > offsets.retention.check.interval.ms=9223372036854775807
> >
> > Will keep an eye on that KIP.
> >
> > Best,
> > Dmitry
> >
> > On Fri, Jul 21, 2017 at 4:31 AM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > >>>> My guess is that offsets are committed only when all tasks in the
> > > >>> topology
> > > >>>> have received input. Is this what's happening?
> > >
> > > No. Task offsets are committed independently from each other.
> > >
> > > You can you double check the logs in DEBUG mode. It indicates when
> > > offsets get committed. Also check via `bin/kafka-consumer-groups.sh`
> > > what offsets are committed (application.id == group.id)
> > >
> > > Hope this helps.
> > >
> > >
> > > -Matthias
> > >
> > > On 7/21/17 2:27 AM, Dmitry Minkovsky wrote:
> > > > Hi Bill,
> > > >
> > > >> When you say "even if the application has not had data for a long
> > time"
> > > do
> > > > you have a rough idea of how long?
> > > >
> > > > Minutes, hours
> > > >
> > > >> What is the value of  your
> > > > "auto.offset.reset"  configuration?
> > > >
> > > > I don't specify it explicitly, but the ConsumerConfig logs indicate
> > > > "auto.offset.reset = earliest" for all consumers the application
> > creates.
> > > >
> > > > Thank you,
> > > > Dmitry
> > > >
> > > >
> > > > On Thu, Jul 20, 2017 at 8:07 PM, Bill Bejeck <bi...@confluent.io>
> > wrote:
> > > >
> > > >> Hi Dmitry,
> > > >>
> > > >> When you say "even if the application has not had data for a long
> > time"
> > > do
> > > >> you have a rough idea of how long?  What is the value of  your
> > > >> "auto.offset.reset"  configuration?
> > > >>
> > > >> Thanks,
> > > >> Bill
> > > >>
> > > >> On Thu, Jul 20, 2017 at 6:03 PM, Dmitry Minkovsky <
> > dminkovsky@gmail.com
> > > >
> > > >> wrote:
> > > >>
> > > >>> My Streams application is configured to commit offsets every 250ms:
> > > >>>
> > > >>>         Properties streamsConfig = new Properties();
> > > >>>         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> > > 250);
> > > >>>
> > > >>>
> > > >>> However, every time I restart my application, records that have
> > already
> > > >>> been processed are re-processed, even if the application has not
> had
> > > data
> > > >>> for a long time.
> > > >>>
> > > >>> My guess is that offsets are committed only when all tasks in the
> > > >> topology
> > > >>> have received input. Is this what's happening?
> > > >>>
> > > >>>
> > > >>>
> > > >>> Thank you,
> > > >>> Dmitry
> > > >>>
> > > >>
> > > >
> > >
> > >
> >
>

Re: Kafka Streams: why aren't offsets being committed?

Posted by Garrett Barton <ga...@gmail.com>.
Dmitry, which KIP are you referring to? I see this behavior too sometimes.

On Fri, Aug 4, 2017 at 10:25 AM, Dmitry Minkovsky <dm...@gmail.com>
wrote:

> Thank you Matthias and Bill,
>
> Just want to confirm that was my offsets *were *being committed but I was
> being affected by `offsets.retention.minutes` which I did not know about. I
> set
>
> offsets.retention.minutes=2147483647
> offsets.retention.check.interval.ms=9223372036854775807
>
> Will keep an eye on that KIP.
>
> Best,
> Dmitry
>
> On Fri, Jul 21, 2017 at 4:31 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > >>>> My guess is that offsets are committed only when all tasks in the
> > >>> topology
> > >>>> have received input. Is this what's happening?
> >
> > No. Task offsets are committed independently from each other.
> >
> > You can you double check the logs in DEBUG mode. It indicates when
> > offsets get committed. Also check via `bin/kafka-consumer-groups.sh`
> > what offsets are committed (application.id == group.id)
> >
> > Hope this helps.
> >
> >
> > -Matthias
> >
> > On 7/21/17 2:27 AM, Dmitry Minkovsky wrote:
> > > Hi Bill,
> > >
> > >> When you say "even if the application has not had data for a long
> time"
> > do
> > > you have a rough idea of how long?
> > >
> > > Minutes, hours
> > >
> > >> What is the value of  your
> > > "auto.offset.reset"  configuration?
> > >
> > > I don't specify it explicitly, but the ConsumerConfig logs indicate
> > > "auto.offset.reset = earliest" for all consumers the application
> creates.
> > >
> > > Thank you,
> > > Dmitry
> > >
> > >
> > > On Thu, Jul 20, 2017 at 8:07 PM, Bill Bejeck <bi...@confluent.io>
> wrote:
> > >
> > >> Hi Dmitry,
> > >>
> > >> When you say "even if the application has not had data for a long
> time"
> > do
> > >> you have a rough idea of how long?  What is the value of  your
> > >> "auto.offset.reset"  configuration?
> > >>
> > >> Thanks,
> > >> Bill
> > >>
> > >> On Thu, Jul 20, 2017 at 6:03 PM, Dmitry Minkovsky <
> dminkovsky@gmail.com
> > >
> > >> wrote:
> > >>
> > >>> My Streams application is configured to commit offsets every 250ms:
> > >>>
> > >>>         Properties streamsConfig = new Properties();
> > >>>         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> > 250);
> > >>>
> > >>>
> > >>> However, every time I restart my application, records that have
> already
> > >>> been processed are re-processed, even if the application has not had
> > data
> > >>> for a long time.
> > >>>
> > >>> My guess is that offsets are committed only when all tasks in the
> > >> topology
> > >>> have received input. Is this what's happening?
> > >>>
> > >>>
> > >>>
> > >>> Thank you,
> > >>> Dmitry
> > >>>
> > >>
> > >
> >
> >
>

Re: Kafka Streams: why aren't offsets being committed?

Posted by Dmitry Minkovsky <dm...@gmail.com>.
Thank you Matthias and Bill,

Just want to confirm that was my offsets *were *being committed but I was
being affected by `offsets.retention.minutes` which I did not know about. I
set

offsets.retention.minutes=2147483647
offsets.retention.check.interval.ms=9223372036854775807

Will keep an eye on that KIP.

Best,
Dmitry

On Fri, Jul 21, 2017 at 4:31 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> >>>> My guess is that offsets are committed only when all tasks in the
> >>> topology
> >>>> have received input. Is this what's happening?
>
> No. Task offsets are committed independently from each other.
>
> You can you double check the logs in DEBUG mode. It indicates when
> offsets get committed. Also check via `bin/kafka-consumer-groups.sh`
> what offsets are committed (application.id == group.id)
>
> Hope this helps.
>
>
> -Matthias
>
> On 7/21/17 2:27 AM, Dmitry Minkovsky wrote:
> > Hi Bill,
> >
> >> When you say "even if the application has not had data for a long time"
> do
> > you have a rough idea of how long?
> >
> > Minutes, hours
> >
> >> What is the value of  your
> > "auto.offset.reset"  configuration?
> >
> > I don't specify it explicitly, but the ConsumerConfig logs indicate
> > "auto.offset.reset = earliest" for all consumers the application creates.
> >
> > Thank you,
> > Dmitry
> >
> >
> > On Thu, Jul 20, 2017 at 8:07 PM, Bill Bejeck <bi...@confluent.io> wrote:
> >
> >> Hi Dmitry,
> >>
> >> When you say "even if the application has not had data for a long time"
> do
> >> you have a rough idea of how long?  What is the value of  your
> >> "auto.offset.reset"  configuration?
> >>
> >> Thanks,
> >> Bill
> >>
> >> On Thu, Jul 20, 2017 at 6:03 PM, Dmitry Minkovsky <dminkovsky@gmail.com
> >
> >> wrote:
> >>
> >>> My Streams application is configured to commit offsets every 250ms:
> >>>
> >>>         Properties streamsConfig = new Properties();
> >>>         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> 250);
> >>>
> >>>
> >>> However, every time I restart my application, records that have already
> >>> been processed are re-processed, even if the application has not had
> data
> >>> for a long time.
> >>>
> >>> My guess is that offsets are committed only when all tasks in the
> >> topology
> >>> have received input. Is this what's happening?
> >>>
> >>>
> >>>
> >>> Thank you,
> >>> Dmitry
> >>>
> >>
> >
>
>

Re: Kafka Streams: why aren't offsets being committed?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
>>>> My guess is that offsets are committed only when all tasks in the
>>> topology
>>>> have received input. Is this what's happening?

No. Task offsets are committed independently from each other.

You can you double check the logs in DEBUG mode. It indicates when
offsets get committed. Also check via `bin/kafka-consumer-groups.sh`
what offsets are committed (application.id == group.id)

Hope this helps.


-Matthias

On 7/21/17 2:27 AM, Dmitry Minkovsky wrote:
> Hi Bill,
> 
>> When you say "even if the application has not had data for a long time" do
> you have a rough idea of how long?
> 
> Minutes, hours
> 
>> What is the value of  your
> "auto.offset.reset"  configuration?
> 
> I don't specify it explicitly, but the ConsumerConfig logs indicate
> "auto.offset.reset = earliest" for all consumers the application creates.
> 
> Thank you,
> Dmitry
> 
> 
> On Thu, Jul 20, 2017 at 8:07 PM, Bill Bejeck <bi...@confluent.io> wrote:
> 
>> Hi Dmitry,
>>
>> When you say "even if the application has not had data for a long time" do
>> you have a rough idea of how long?  What is the value of  your
>> "auto.offset.reset"  configuration?
>>
>> Thanks,
>> Bill
>>
>> On Thu, Jul 20, 2017 at 6:03 PM, Dmitry Minkovsky <dm...@gmail.com>
>> wrote:
>>
>>> My Streams application is configured to commit offsets every 250ms:
>>>
>>>         Properties streamsConfig = new Properties();
>>>         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 250);
>>>
>>>
>>> However, every time I restart my application, records that have already
>>> been processed are re-processed, even if the application has not had data
>>> for a long time.
>>>
>>> My guess is that offsets are committed only when all tasks in the
>> topology
>>> have received input. Is this what's happening?
>>>
>>>
>>>
>>> Thank you,
>>> Dmitry
>>>
>>
> 


Re: Kafka Streams: why aren't offsets being committed?

Posted by Dmitry Minkovsky <dm...@gmail.com>.
Hi Bill,

> When you say "even if the application has not had data for a long time" do
you have a rough idea of how long?

Minutes, hours

> What is the value of  your
"auto.offset.reset"  configuration?

I don't specify it explicitly, but the ConsumerConfig logs indicate
"auto.offset.reset = earliest" for all consumers the application creates.

Thank you,
Dmitry


On Thu, Jul 20, 2017 at 8:07 PM, Bill Bejeck <bi...@confluent.io> wrote:

> Hi Dmitry,
>
> When you say "even if the application has not had data for a long time" do
> you have a rough idea of how long?  What is the value of  your
> "auto.offset.reset"  configuration?
>
> Thanks,
> Bill
>
> On Thu, Jul 20, 2017 at 6:03 PM, Dmitry Minkovsky <dm...@gmail.com>
> wrote:
>
> > My Streams application is configured to commit offsets every 250ms:
> >
> >         Properties streamsConfig = new Properties();
> >         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 250);
> >
> >
> > However, every time I restart my application, records that have already
> > been processed are re-processed, even if the application has not had data
> > for a long time.
> >
> > My guess is that offsets are committed only when all tasks in the
> topology
> > have received input. Is this what's happening?
> >
> >
> >
> > Thank you,
> > Dmitry
> >
>

Re: Kafka Streams: why aren't offsets being committed?

Posted by Bill Bejeck <bi...@confluent.io>.
Hi Dmitry,

When you say "even if the application has not had data for a long time" do
you have a rough idea of how long?  What is the value of  your
"auto.offset.reset"  configuration?

Thanks,
Bill

On Thu, Jul 20, 2017 at 6:03 PM, Dmitry Minkovsky <dm...@gmail.com>
wrote:

> My Streams application is configured to commit offsets every 250ms:
>
>         Properties streamsConfig = new Properties();
>         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 250);
>
>
> However, every time I restart my application, records that have already
> been processed are re-processed, even if the application has not had data
> for a long time.
>
> My guess is that offsets are committed only when all tasks in the topology
> have received input. Is this what's happening?
>
>
>
> Thank you,
> Dmitry
>