You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Dmitry Minkovsky <dm...@gmail.com> on 2017/08/04 14:25:48 UTC

Re: Kafka Streams: why aren't offsets being committed?

Thank you Matthias and Bill,

Just want to confirm that was my offsets *were *being committed but I was
being affected by `offsets.retention.minutes` which I did not know about. I
set

offsets.retention.minutes=2147483647
offsets.retention.check.interval.ms=9223372036854775807

Will keep an eye on that KIP.

Best,
Dmitry

On Fri, Jul 21, 2017 at 4:31 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> >>>> My guess is that offsets are committed only when all tasks in the
> >>> topology
> >>>> have received input. Is this what's happening?
>
> No. Task offsets are committed independently from each other.
>
> You can you double check the logs in DEBUG mode. It indicates when
> offsets get committed. Also check via `bin/kafka-consumer-groups.sh`
> what offsets are committed (application.id == group.id)
>
> Hope this helps.
>
>
> -Matthias
>
> On 7/21/17 2:27 AM, Dmitry Minkovsky wrote:
> > Hi Bill,
> >
> >> When you say "even if the application has not had data for a long time"
> do
> > you have a rough idea of how long?
> >
> > Minutes, hours
> >
> >> What is the value of  your
> > "auto.offset.reset"  configuration?
> >
> > I don't specify it explicitly, but the ConsumerConfig logs indicate
> > "auto.offset.reset = earliest" for all consumers the application creates.
> >
> > Thank you,
> > Dmitry
> >
> >
> > On Thu, Jul 20, 2017 at 8:07 PM, Bill Bejeck <bi...@confluent.io> wrote:
> >
> >> Hi Dmitry,
> >>
> >> When you say "even if the application has not had data for a long time"
> do
> >> you have a rough idea of how long?  What is the value of  your
> >> "auto.offset.reset"  configuration?
> >>
> >> Thanks,
> >> Bill
> >>
> >> On Thu, Jul 20, 2017 at 6:03 PM, Dmitry Minkovsky <dminkovsky@gmail.com
> >
> >> wrote:
> >>
> >>> My Streams application is configured to commit offsets every 250ms:
> >>>
> >>>         Properties streamsConfig = new Properties();
> >>>         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> 250);
> >>>
> >>>
> >>> However, every time I restart my application, records that have already
> >>> been processed are re-processed, even if the application has not had
> data
> >>> for a long time.
> >>>
> >>> My guess is that offsets are committed only when all tasks in the
> >> topology
> >>> have received input. Is this what's happening?
> >>>
> >>>
> >>>
> >>> Thank you,
> >>> Dmitry
> >>>
> >>
> >
>
>

Re: Kafka Streams: why aren't offsets being committed?

Posted by Garrett Barton <ga...@gmail.com>.
Bingo!  Thanks Dmitry, that is exactly what I'm running into.  Looks like I
just have to set offset.retention.minutes to be greater than my largest
log.retention.hours topics to be safe.

On Mon, Aug 7, 2017 at 9:03 PM, Dmitry Minkovsky <dm...@gmail.com>
wrote:

> Hi Garrett,
>
> This one
> https://issues.apache.org/jira/plugins/servlet/mobile#issue/KAFKA-5510
>
> Best,
> Dmitry
>
> пн, 7 авг. 2017 г. в 14:22, Garrett Barton <ga...@gmail.com>:
>
> > Dmitry, which KIP are you referring to? I see this behavior too
> sometimes.
> >
> > On Fri, Aug 4, 2017 at 10:25 AM, Dmitry Minkovsky <dm...@gmail.com>
> > wrote:
> >
> > > Thank you Matthias and Bill,
> > >
> > > Just want to confirm that was my offsets *were *being committed but I
> was
> > > being affected by `offsets.retention.minutes` which I did not know
> > about. I
> > > set
> > >
> > > offsets.retention.minutes=2147483647
> > > offsets.retention.check.interval.ms=9223372036854775807
> > >
> > > Will keep an eye on that KIP.
> > >
> > > Best,
> > > Dmitry
> > >
> > > On Fri, Jul 21, 2017 at 4:31 AM, Matthias J. Sax <
> matthias@confluent.io>
> > > wrote:
> > >
> > > > >>>> My guess is that offsets are committed only when all tasks in
> the
> > > > >>> topology
> > > > >>>> have received input. Is this what's happening?
> > > >
> > > > No. Task offsets are committed independently from each other.
> > > >
> > > > You can you double check the logs in DEBUG mode. It indicates when
> > > > offsets get committed. Also check via `bin/kafka-consumer-groups.sh`
> > > > what offsets are committed (application.id == group.id)
> > > >
> > > > Hope this helps.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 7/21/17 2:27 AM, Dmitry Minkovsky wrote:
> > > > > Hi Bill,
> > > > >
> > > > >> When you say "even if the application has not had data for a long
> > > time"
> > > > do
> > > > > you have a rough idea of how long?
> > > > >
> > > > > Minutes, hours
> > > > >
> > > > >> What is the value of  your
> > > > > "auto.offset.reset"  configuration?
> > > > >
> > > > > I don't specify it explicitly, but the ConsumerConfig logs indicate
> > > > > "auto.offset.reset = earliest" for all consumers the application
> > > creates.
> > > > >
> > > > > Thank you,
> > > > > Dmitry
> > > > >
> > > > >
> > > > > On Thu, Jul 20, 2017 at 8:07 PM, Bill Bejeck <bi...@confluent.io>
> > > wrote:
> > > > >
> > > > >> Hi Dmitry,
> > > > >>
> > > > >> When you say "even if the application has not had data for a long
> > > time"
> > > > do
> > > > >> you have a rough idea of how long?  What is the value of  your
> > > > >> "auto.offset.reset"  configuration?
> > > > >>
> > > > >> Thanks,
> > > > >> Bill
> > > > >>
> > > > >> On Thu, Jul 20, 2017 at 6:03 PM, Dmitry Minkovsky <
> > > dminkovsky@gmail.com
> > > > >
> > > > >> wrote:
> > > > >>
> > > > >>> My Streams application is configured to commit offsets every
> 250ms:
> > > > >>>
> > > > >>>         Properties streamsConfig = new Properties();
> > > > >>>         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_
> MS_CONFIG,
> > > > 250);
> > > > >>>
> > > > >>>
> > > > >>> However, every time I restart my application, records that have
> > > already
> > > > >>> been processed are re-processed, even if the application has not
> > had
> > > > data
> > > > >>> for a long time.
> > > > >>>
> > > > >>> My guess is that offsets are committed only when all tasks in the
> > > > >> topology
> > > > >>> have received input. Is this what's happening?
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> Thank you,
> > > > >>> Dmitry
> > > > >>>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
>

Re: Kafka Streams: why aren't offsets being committed?

Posted by Dmitry Minkovsky <dm...@gmail.com>.
Hi Garrett,

This one
https://issues.apache.org/jira/plugins/servlet/mobile#issue/KAFKA-5510

Best,
Dmitry

пн, 7 авг. 2017 г. в 14:22, Garrett Barton <ga...@gmail.com>:

> Dmitry, which KIP are you referring to? I see this behavior too sometimes.
>
> On Fri, Aug 4, 2017 at 10:25 AM, Dmitry Minkovsky <dm...@gmail.com>
> wrote:
>
> > Thank you Matthias and Bill,
> >
> > Just want to confirm that was my offsets *were *being committed but I was
> > being affected by `offsets.retention.minutes` which I did not know
> about. I
> > set
> >
> > offsets.retention.minutes=2147483647
> > offsets.retention.check.interval.ms=9223372036854775807
> >
> > Will keep an eye on that KIP.
> >
> > Best,
> > Dmitry
> >
> > On Fri, Jul 21, 2017 at 4:31 AM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > >>>> My guess is that offsets are committed only when all tasks in the
> > > >>> topology
> > > >>>> have received input. Is this what's happening?
> > >
> > > No. Task offsets are committed independently from each other.
> > >
> > > You can you double check the logs in DEBUG mode. It indicates when
> > > offsets get committed. Also check via `bin/kafka-consumer-groups.sh`
> > > what offsets are committed (application.id == group.id)
> > >
> > > Hope this helps.
> > >
> > >
> > > -Matthias
> > >
> > > On 7/21/17 2:27 AM, Dmitry Minkovsky wrote:
> > > > Hi Bill,
> > > >
> > > >> When you say "even if the application has not had data for a long
> > time"
> > > do
> > > > you have a rough idea of how long?
> > > >
> > > > Minutes, hours
> > > >
> > > >> What is the value of  your
> > > > "auto.offset.reset"  configuration?
> > > >
> > > > I don't specify it explicitly, but the ConsumerConfig logs indicate
> > > > "auto.offset.reset = earliest" for all consumers the application
> > creates.
> > > >
> > > > Thank you,
> > > > Dmitry
> > > >
> > > >
> > > > On Thu, Jul 20, 2017 at 8:07 PM, Bill Bejeck <bi...@confluent.io>
> > wrote:
> > > >
> > > >> Hi Dmitry,
> > > >>
> > > >> When you say "even if the application has not had data for a long
> > time"
> > > do
> > > >> you have a rough idea of how long?  What is the value of  your
> > > >> "auto.offset.reset"  configuration?
> > > >>
> > > >> Thanks,
> > > >> Bill
> > > >>
> > > >> On Thu, Jul 20, 2017 at 6:03 PM, Dmitry Minkovsky <
> > dminkovsky@gmail.com
> > > >
> > > >> wrote:
> > > >>
> > > >>> My Streams application is configured to commit offsets every 250ms:
> > > >>>
> > > >>>         Properties streamsConfig = new Properties();
> > > >>>         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> > > 250);
> > > >>>
> > > >>>
> > > >>> However, every time I restart my application, records that have
> > already
> > > >>> been processed are re-processed, even if the application has not
> had
> > > data
> > > >>> for a long time.
> > > >>>
> > > >>> My guess is that offsets are committed only when all tasks in the
> > > >> topology
> > > >>> have received input. Is this what's happening?
> > > >>>
> > > >>>
> > > >>>
> > > >>> Thank you,
> > > >>> Dmitry
> > > >>>
> > > >>
> > > >
> > >
> > >
> >
>

Re: Kafka Streams: why aren't offsets being committed?

Posted by Garrett Barton <ga...@gmail.com>.
Dmitry, which KIP are you referring to? I see this behavior too sometimes.

On Fri, Aug 4, 2017 at 10:25 AM, Dmitry Minkovsky <dm...@gmail.com>
wrote:

> Thank you Matthias and Bill,
>
> Just want to confirm that was my offsets *were *being committed but I was
> being affected by `offsets.retention.minutes` which I did not know about. I
> set
>
> offsets.retention.minutes=2147483647
> offsets.retention.check.interval.ms=9223372036854775807
>
> Will keep an eye on that KIP.
>
> Best,
> Dmitry
>
> On Fri, Jul 21, 2017 at 4:31 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > >>>> My guess is that offsets are committed only when all tasks in the
> > >>> topology
> > >>>> have received input. Is this what's happening?
> >
> > No. Task offsets are committed independently from each other.
> >
> > You can you double check the logs in DEBUG mode. It indicates when
> > offsets get committed. Also check via `bin/kafka-consumer-groups.sh`
> > what offsets are committed (application.id == group.id)
> >
> > Hope this helps.
> >
> >
> > -Matthias
> >
> > On 7/21/17 2:27 AM, Dmitry Minkovsky wrote:
> > > Hi Bill,
> > >
> > >> When you say "even if the application has not had data for a long
> time"
> > do
> > > you have a rough idea of how long?
> > >
> > > Minutes, hours
> > >
> > >> What is the value of  your
> > > "auto.offset.reset"  configuration?
> > >
> > > I don't specify it explicitly, but the ConsumerConfig logs indicate
> > > "auto.offset.reset = earliest" for all consumers the application
> creates.
> > >
> > > Thank you,
> > > Dmitry
> > >
> > >
> > > On Thu, Jul 20, 2017 at 8:07 PM, Bill Bejeck <bi...@confluent.io>
> wrote:
> > >
> > >> Hi Dmitry,
> > >>
> > >> When you say "even if the application has not had data for a long
> time"
> > do
> > >> you have a rough idea of how long?  What is the value of  your
> > >> "auto.offset.reset"  configuration?
> > >>
> > >> Thanks,
> > >> Bill
> > >>
> > >> On Thu, Jul 20, 2017 at 6:03 PM, Dmitry Minkovsky <
> dminkovsky@gmail.com
> > >
> > >> wrote:
> > >>
> > >>> My Streams application is configured to commit offsets every 250ms:
> > >>>
> > >>>         Properties streamsConfig = new Properties();
> > >>>         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> > 250);
> > >>>
> > >>>
> > >>> However, every time I restart my application, records that have
> already
> > >>> been processed are re-processed, even if the application has not had
> > data
> > >>> for a long time.
> > >>>
> > >>> My guess is that offsets are committed only when all tasks in the
> > >> topology
> > >>> have received input. Is this what's happening?
> > >>>
> > >>>
> > >>>
> > >>> Thank you,
> > >>> Dmitry
> > >>>
> > >>
> > >
> >
> >
>