You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jonathan Santilli <jo...@gmail.com> on 2019/01/14 12:31:36 UTC

Why do the offsets of the consumer-group (app-id) of my Kafka Streams Application get reset after application restart?

I have a Kafka Streams application for which, whenever I restart it, the
offsets for the topic partitions (*KTABLE-SUPPRESS-STATE-STORE*) it is
consuming get reset to 0. Hence, for all partitions, the lags increase and
the app needs to reprocess all the data.

I have ensured the lag is 1 for every partition before the restart. All
consumers that belong to that consumer-group-id (app-id) are active. The
restart is immediate, it takes around 30 secs.

The app is using exactly once as processing guarantee.

I have read this answer How does an offset expire for an Apache Kafka
consumer group?
<https://stackoverflow.com/questions/39131465/how-does-an-offset-expire-for-an-apache-kafka-consumer-group>
.

I have tried with *auto.offset.reset = latest* and *auto.offset.reset =
earliest*.

I assume that after the restart the app should pick-up from the latest
committed offset for that consumer group.

It is possible to know why the offsets are getting reset from 0?

I would really appreciate any clue about this.

This is the code the App execute:

final StreamsBuilder builder = new StreamsBuilder();
final KStream<..., ...> events = builder
        .stream(inputTopicNames, Consumed.with(..., ...)
        .withTimestampExtractor(...);

events
    .filter((k, v) -> ...)
    .flatMapValues(v -> ...)
    .flatMapValues(v -> ...)
    .selectKey((k, v) -> v)
    .groupByKey(Grouped.with(..., ...))
    .windowedBy(
        TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
            .advanceBy(Duration.ofSeconds(windowSizeInSecs))
            .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
    .reduce((agg, new) -> {
        ...
        return agg;
    })
    .suppress(Suppressed.untilWindowCloses(
                  Suppressed.BufferConfig.unbounded()))
    .toStream()
    .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));

The offset reset just and always happens (after restarting) with the
*KTABLE-SUPPRESS-STATE-STORE* internal topic created by the Kafka Stream
API.

I have tried with the Processing guarantee *exactly once* and *at least
once*.

Once again, I will really appreciate any clue about this.
P.S: I have also posted the question in *SO*:
https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio


Cheers!
-- 
Santilli Jonathan

Re: Why do the offsets of the consumer-group (app-id) of my Kafka Streams Application get reset after application restart?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Seems this question was cross posted on SO:
https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio


On 1/14/19 8:49 AM, Jonathan Santilli wrote:
> Hello Bill, thanks a lot for the reply,
> I will implement your recommendation about the
> *KafkaStreams#setGlobalStateRestoreListener.*
> 
> About your question:
> 
> *When you say you have used both "exactly once" and "at least once" for the*
> *"at least once" case did you run for a while in that mode then restart?*
> 
> *Yes, I have done that among other combinations, but the same behaviour.*
> 
> This is what I see in the logs after restart:
> 
> INFO  [*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3]
> internals.StoreChangelogReader (StoreChangelogReader.java:215) -
> stream-thread [*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3]
> No checkpoint found for task 1_8 state store
> KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
> *APP-ID-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-8* with EOS turned
> on. Reinitializing the task and restore its state from the beginning.
> 
> INFO  [*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3]
> internals.Fetcher (Fetcher.java:583) - [Consumer
> clientId=*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3-restore-consumer,
> groupId=] Resetting offset for partition
> *APP-ID-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-8
> to offset 0*.
> 
> 
> Before I restart, I always check the LAG for the consumer group (*APP-ID*)
> reading from the output topic 'outPutTopicNameOfGroupedData' to verify is
> 1. Immediately after the restart and verify the logs above, the LAG for
> that consumer group (*APP-ID*) reading from the output topic '
> outPutTopicNameOfGroupedData' goes up, increasing so much that the App
> reading from 'outPutTopicNameOfGroupedData' topic, is re-processing the
> data again.
> 
> I hope someone can give me some clue, I will really appreciate.
> 
> 
> Cheers!
> --
> Jonathan
> 
> 
> On Mon, Jan 14, 2019 at 4:12 PM Bill Bejeck <bi...@confluent.io> wrote:
> 
>> Hi Jonathan,
>>
>> With EOS enabled, Kafka Streams does not use checkpoint files for restoring
>> state stores; it will replay the data contained in the changelog topic.
>> But this should not affect where the input source topic(s) after a restart
>> also the changelog topics are only consumed from during a restore (or for
>> keeping standby tasks up to date).
>>
>> When you say you have used both "exactly once" and "at least once" for the
>> "at least once" case did you run for a while in that mode then restart? You
>> can confirm how much data and from which offset the streams is restoring a
>> state store by using a custom implementation of the StateRestoreListener
>> class and set it via the KafkaStreams#setGlobalStateRestoreListener.
>>
>> -Bill
>>
>>
>> On Mon, Jan 14, 2019 at 7:32 AM Jonathan Santilli <
>> jonathansantilli@gmail.com> wrote:
>>
>>> I have a Kafka Streams application for which, whenever I restart it, the
>>> offsets for the topic partitions (*KTABLE-SUPPRESS-STATE-STORE*) it is
>>> consuming get reset to 0. Hence, for all partitions, the lags increase
>> and
>>> the app needs to reprocess all the data.
>>>
>>> I have ensured the lag is 1 for every partition before the restart. All
>>> consumers that belong to that consumer-group-id (app-id) are active. The
>>> restart is immediate, it takes around 30 secs.
>>>
>>> The app is using exactly once as processing guarantee.
>>>
>>> I have read this answer How does an offset expire for an Apache Kafka
>>> consumer group?
>>> <
>>>
>> https://stackoverflow.com/questions/39131465/how-does-an-offset-expire-for-an-apache-kafka-consumer-group
>>>>
>>> .
>>>
>>> I have tried with *auto.offset.reset = latest* and *auto.offset.reset =
>>> earliest*.
>>>
>>> I assume that after the restart the app should pick-up from the latest
>>> committed offset for that consumer group.
>>>
>>> It is possible to know why the offsets are getting reset from 0?
>>>
>>> I would really appreciate any clue about this.
>>>
>>> This is the code the App execute:
>>>
>>> final StreamsBuilder builder = new StreamsBuilder();
>>> final KStream<..., ...> events = builder
>>>         .stream(inputTopicNames, Consumed.with(..., ...)
>>>         .withTimestampExtractor(...);
>>>
>>> events
>>>     .filter((k, v) -> ...)
>>>     .flatMapValues(v -> ...)
>>>     .flatMapValues(v -> ...)
>>>     .selectKey((k, v) -> v)
>>>     .groupByKey(Grouped.with(..., ...))
>>>     .windowedBy(
>>>         TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
>>>             .advanceBy(Duration.ofSeconds(windowSizeInSecs))
>>>             .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
>>>     .reduce((agg, new) -> {
>>>         ...
>>>         return agg;
>>>     })
>>>     .suppress(Suppressed.untilWindowCloses(
>>>                   Suppressed.BufferConfig.unbounded()))
>>>     .toStream()
>>>     .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));
>>>
>>> The offset reset just and always happens (after restarting) with the
>>> *KTABLE-SUPPRESS-STATE-STORE* internal topic created by the Kafka Stream
>>> API.
>>>
>>> I have tried with the Processing guarantee *exactly once* and *at least
>>> once*.
>>>
>>> Once again, I will really appreciate any clue about this.
>>> P.S: I have also posted the question in *SO*:
>>>
>>>
>> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
>>>
>>>
>>> Cheers!
>>> --
>>> Santilli Jonathan
>>>
>>
> 
> 


Re: Why do the offsets of the consumer-group (app-id) of my Kafka Streams Application get reset after application restart?

Posted by Jonathan Santilli <jo...@gmail.com>.
Hello Bill, thanks a lot for the reply,
I will implement your recommendation about the
*KafkaStreams#setGlobalStateRestoreListener.*

About your question:

*When you say you have used both "exactly once" and "at least once" for the*
*"at least once" case did you run for a while in that mode then restart?*

*Yes, I have done that among other combinations, but the same behaviour.*

This is what I see in the logs after restart:

INFO  [*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3]
internals.StoreChangelogReader (StoreChangelogReader.java:215) -
stream-thread [*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3]
No checkpoint found for task 1_8 state store
KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
*APP-ID-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-8* with EOS turned
on. Reinitializing the task and restore its state from the beginning.

INFO  [*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3]
internals.Fetcher (Fetcher.java:583) - [Consumer
clientId=*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3-restore-consumer,
groupId=] Resetting offset for partition
*APP-ID-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-8
to offset 0*.


Before I restart, I always check the LAG for the consumer group (*APP-ID*)
reading from the output topic 'outPutTopicNameOfGroupedData' to verify is
1. Immediately after the restart and verify the logs above, the LAG for
that consumer group (*APP-ID*) reading from the output topic '
outPutTopicNameOfGroupedData' goes up, increasing so much that the App
reading from 'outPutTopicNameOfGroupedData' topic, is re-processing the
data again.

I hope someone can give me some clue, I will really appreciate.


Cheers!
--
Jonathan


On Mon, Jan 14, 2019 at 4:12 PM Bill Bejeck <bi...@confluent.io> wrote:

> Hi Jonathan,
>
> With EOS enabled, Kafka Streams does not use checkpoint files for restoring
> state stores; it will replay the data contained in the changelog topic.
> But this should not affect where the input source topic(s) after a restart
> also the changelog topics are only consumed from during a restore (or for
> keeping standby tasks up to date).
>
> When you say you have used both "exactly once" and "at least once" for the
> "at least once" case did you run for a while in that mode then restart? You
> can confirm how much data and from which offset the streams is restoring a
> state store by using a custom implementation of the StateRestoreListener
> class and set it via the KafkaStreams#setGlobalStateRestoreListener.
>
> -Bill
>
>
> On Mon, Jan 14, 2019 at 7:32 AM Jonathan Santilli <
> jonathansantilli@gmail.com> wrote:
>
> > I have a Kafka Streams application for which, whenever I restart it, the
> > offsets for the topic partitions (*KTABLE-SUPPRESS-STATE-STORE*) it is
> > consuming get reset to 0. Hence, for all partitions, the lags increase
> and
> > the app needs to reprocess all the data.
> >
> > I have ensured the lag is 1 for every partition before the restart. All
> > consumers that belong to that consumer-group-id (app-id) are active. The
> > restart is immediate, it takes around 30 secs.
> >
> > The app is using exactly once as processing guarantee.
> >
> > I have read this answer How does an offset expire for an Apache Kafka
> > consumer group?
> > <
> >
> https://stackoverflow.com/questions/39131465/how-does-an-offset-expire-for-an-apache-kafka-consumer-group
> > >
> > .
> >
> > I have tried with *auto.offset.reset = latest* and *auto.offset.reset =
> > earliest*.
> >
> > I assume that after the restart the app should pick-up from the latest
> > committed offset for that consumer group.
> >
> > It is possible to know why the offsets are getting reset from 0?
> >
> > I would really appreciate any clue about this.
> >
> > This is the code the App execute:
> >
> > final StreamsBuilder builder = new StreamsBuilder();
> > final KStream<..., ...> events = builder
> >         .stream(inputTopicNames, Consumed.with(..., ...)
> >         .withTimestampExtractor(...);
> >
> > events
> >     .filter((k, v) -> ...)
> >     .flatMapValues(v -> ...)
> >     .flatMapValues(v -> ...)
> >     .selectKey((k, v) -> v)
> >     .groupByKey(Grouped.with(..., ...))
> >     .windowedBy(
> >         TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
> >             .advanceBy(Duration.ofSeconds(windowSizeInSecs))
> >             .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
> >     .reduce((agg, new) -> {
> >         ...
> >         return agg;
> >     })
> >     .suppress(Suppressed.untilWindowCloses(
> >                   Suppressed.BufferConfig.unbounded()))
> >     .toStream()
> >     .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));
> >
> > The offset reset just and always happens (after restarting) with the
> > *KTABLE-SUPPRESS-STATE-STORE* internal topic created by the Kafka Stream
> > API.
> >
> > I have tried with the Processing guarantee *exactly once* and *at least
> > once*.
> >
> > Once again, I will really appreciate any clue about this.
> > P.S: I have also posted the question in *SO*:
> >
> >
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> >
> >
> > Cheers!
> > --
> > Santilli Jonathan
> >
>


-- 
Santilli Jonathan

Re: Why do the offsets of the consumer-group (app-id) of my Kafka Streams Application get reset after application restart?

Posted by Bill Bejeck <bi...@confluent.io>.
Hi Jonathan,

With EOS enabled, Kafka Streams does not use checkpoint files for restoring
state stores; it will replay the data contained in the changelog topic.
But this should not affect where the input source topic(s) after a restart
also the changelog topics are only consumed from during a restore (or for
keeping standby tasks up to date).

When you say you have used both "exactly once" and "at least once" for the
"at least once" case did you run for a while in that mode then restart? You
can confirm how much data and from which offset the streams is restoring a
state store by using a custom implementation of the StateRestoreListener
class and set it via the KafkaStreams#setGlobalStateRestoreListener.

-Bill


On Mon, Jan 14, 2019 at 7:32 AM Jonathan Santilli <
jonathansantilli@gmail.com> wrote:

> I have a Kafka Streams application for which, whenever I restart it, the
> offsets for the topic partitions (*KTABLE-SUPPRESS-STATE-STORE*) it is
> consuming get reset to 0. Hence, for all partitions, the lags increase and
> the app needs to reprocess all the data.
>
> I have ensured the lag is 1 for every partition before the restart. All
> consumers that belong to that consumer-group-id (app-id) are active. The
> restart is immediate, it takes around 30 secs.
>
> The app is using exactly once as processing guarantee.
>
> I have read this answer How does an offset expire for an Apache Kafka
> consumer group?
> <
> https://stackoverflow.com/questions/39131465/how-does-an-offset-expire-for-an-apache-kafka-consumer-group
> >
> .
>
> I have tried with *auto.offset.reset = latest* and *auto.offset.reset =
> earliest*.
>
> I assume that after the restart the app should pick-up from the latest
> committed offset for that consumer group.
>
> It is possible to know why the offsets are getting reset from 0?
>
> I would really appreciate any clue about this.
>
> This is the code the App execute:
>
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream<..., ...> events = builder
>         .stream(inputTopicNames, Consumed.with(..., ...)
>         .withTimestampExtractor(...);
>
> events
>     .filter((k, v) -> ...)
>     .flatMapValues(v -> ...)
>     .flatMapValues(v -> ...)
>     .selectKey((k, v) -> v)
>     .groupByKey(Grouped.with(..., ...))
>     .windowedBy(
>         TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
>             .advanceBy(Duration.ofSeconds(windowSizeInSecs))
>             .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
>     .reduce((agg, new) -> {
>         ...
>         return agg;
>     })
>     .suppress(Suppressed.untilWindowCloses(
>                   Suppressed.BufferConfig.unbounded()))
>     .toStream()
>     .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));
>
> The offset reset just and always happens (after restarting) with the
> *KTABLE-SUPPRESS-STATE-STORE* internal topic created by the Kafka Stream
> API.
>
> I have tried with the Processing guarantee *exactly once* and *at least
> once*.
>
> Once again, I will really appreciate any clue about this.
> P.S: I have also posted the question in *SO*:
>
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
>
>
> Cheers!
> --
> Santilli Jonathan
>