You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Upesh Desai <ud...@itrsgroup.com> on 2021/03/23 15:43:55 UTC

Kafka Streams Processor API state stores not restored via changelog topics

Hi all,

Our team think we discovered a bug over the weekend withing the Kafka Streams / Processor API. We are running 2.7.0.

When configuring a state store backed by a changelog topic with the cleanup policy configuration set to “compact,delete”:

final StoreBuilder<KeyValueStore<k,v>> store = Stores
  .keyValueStoreBuilder(
    Stores.persistentKeyValueStore(STORE_ID),
    kSerde,
    vSerde)
  .withLoggingEnabled(Map.of(
    RETENTION_MS_CONFIG, "90000000"),
    CLEANUP_POLICY_CONFIG, "compact,delete"))
  .withCachingEnabled();

Here is how we reproduced the problem:

  1.  Records are written to the state store, and subsequently produced to the changelog topic.
  2.  Store streams application
  3.  Delete state.dir directory
  4.  Restart streams application
  5.  Confirm state store is initialized empty with no records restored from changelog

We see this problem with both in-memory and RocksDB backed state stores. For persistent state store, if the streams application is restarted without the state dir being deleted, the application still does not “restore” from the changelog, but records are still seen in the state store.

When rolling back to 2.6, we do not see this issue.

Doing some debugging in the source code, in the StoreChangelogReader class I found that the number of records to restore is always 0 based on the below snippet:


private void restoreChangelog(final ChangelogMetadata changelogMetadata) {
    final ProcessorStateManager stateManager = changelogMetadata.stateManager;
    final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
    final TopicPartition partition = storeMetadata.changelogPartition();
    final String storeName = storeMetadata.store().name();
    final int numRecords = changelogMetadata.bufferedLimitIndex;

Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.

My question to you all is, 1) Is this expected behavior? 2) If not, is it a bug?

Hope to get some clarity, and thanks in advance!

Best,
Upesh

Upesh Desai
Senior Software Developer
udesai@itrsgroup.com
www.itrsgroup.com
Internet communications are not secure and therefore the ITRS Group does not accept legal responsibility for the contents of this message. Any view or opinions presented are solely those of the author and do not necessarily represent those of the ITRS Group unless otherwise specifically stated.
[itrs.email.signature]

Disclaimer

The information contained in this communication from the sender is confidential. It is intended solely for use by the recipient and others authorized to receive it. If you are not the recipient, you are hereby notified that any disclosure, copying, distribution or taking action in relation of the contents of this information is strictly prohibited and may be unlawful.

This email has been scanned for viruses and malware, and may have been automatically archived by Mimecast Ltd, an innovator in Software as a Service (SaaS) for business. Providing a safer and more useful place for your human generated data. Specializing in; Security, archiving and compliance. To find out more visit the Mimecast website.

Re: Kafka Streams Processor API state stores not restored via changelog topics

Posted by Guozhang Wang <wa...@gmail.com>.
Great to hear! Always a pleasure.

Guozhang

On Tue, Mar 30, 2021 at 8:04 PM Upesh Desai <ud...@itrsgroup.com> wrote:

> Hi Guozhang,
>
>
>
> We can confirm the behavior with the 2.7.1 release. Appreciate all the
> help!
>
>
>
> Cheers,
>
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
> <ud...@itrsgroup.com>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang <wa...@gmail.com>
> *Date: *Tuesday, March 30, 2021 at 2:10 PM
> *To: *Users <us...@kafka.apache.org>
> *Cc: *Bart Lilje <bl...@itrsgroup.com>
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> Great, I think https://issues.apache.org/jira/browse/KAFKA-12323 is indeed
> the root cause then. Note that this is only an issue with punctuation
> triggered events, where `context.timestamp()` would return 0 (and it is
> fixed in the yet-to-release 2.7.1/2.8.0).
>
> You can consider applying the patch if you could on top of 2.7.0, or wait
> for the new release; OR, if your production code does not actually use
> punctuation to write records to Kafka, then this issue would not actually
> impact you.
>
>
> Guozhang
>
> On Tue, Mar 30, 2021 at 11:56 AM Upesh Desai <ud...@itrsgroup.com> wrote:
>
> > Hi Guozhang,
> >
> >
> >
> > Great to hear we might have found the issue!
> >
> >
> >
> > To answer your question, the changelog record is generated by us calling
> > ‘store.put(key,value)’ from the punctuate callback, which makes sense
> then
> > because the timestamp would be 0 like you saw in your test as well.
> >
> >
> >
> > Best,
> >
> > Upesh
> >
> >
> > Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
> > <ud...@itrsgroup.com>
> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > <https://www.itrsgroup.com/>
> >
> > *From: *Guozhang Wang <wa...@gmail.com>
> > *Date: *Tuesday, March 30, 2021 at 1:00 PM
> > *To: *Users <us...@kafka.apache.org>
> > *Cc: *Bart Lilje <bl...@itrsgroup.com>
> > *Subject: *Re: Kafka Streams Processor API state stores not restored via
> > changelog topics
> >
> > Hello Upesh,
> >
> > These are super helpful logs, and I think I'm very close to the root
> cause
> > of it. You see, the written changelog record's timestamp is set to 0
> > (i.e. January 1st 1970 at midnight GMT), and hence given a reasonable
> Kafka
> > server start time (presumingly in 21st century), the retention time would
> > always be breached, and causing the log deletion mechanism to trigger.
> >
> > The timestamp is set with `context.timestamp()` which would use the
> > processing record's timestamp; but myself have seen and fixed a bug (
> > https://issues.apache.org/jira/browse/KAFKA-12323) where the timestamp
> was
> > not populated and hence set to 0 if was generated as part of a
> punctuation.
> > So my next key question is: is this changelog record generated, i.e. its
> > put call triggered, from processing an input record, or from a
> punctuation
> > call?
> >
> >
> > Guozhang
> >
> > On Mon, Mar 29, 2021 at 2:01 PM Upesh Desai <ud...@itrsgroup.com>
> wrote:
> >
> > > Hi Guozhang,
> > >
> > >
> > >
> > > When testing with a 2.6.1 broker and 2.7 streams application, I see the
> > > same behavior as described before with the 2.7 broker where just after
> a
> > > record is written to the changelog topic, the log segment is rolled and
> > > deleted citing that the retention time has passed (the record was
> written
> > > to the state store at ~15:49:
> > >
> > >
> > >
> > > [2021-03-29 15:49:13,757] INFO [Log
> > > partition=test-stream-store-changelog-4,
> > > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Found deletable
> > > segments with base offsets [0] due to retention time 259200000ms breach
> > > (kafka.log.Log)
> > > [2021-03-29 15:49:13,761] INFO [ProducerStateManager
> > > partition=test-stream-store-changelog-4] Writing producer snapshot at
> > > offset 1 (kafka.log.ProducerStateManager)
> > > [2021-03-29 15:49:13,763] INFO [Log
> > > partition=test-stream-store-changelog-4,
> > > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Rolled new log
> > > segment at offset 1 in 5 ms. (kafka.log.Log)
> > > [2021-03-29 15:49:13,764] INFO [Log
> > > partition=test-stream-store-changelog-4,
> > > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Scheduling
> segments
> > > for deletion LogSegment(baseOffset=0, size=156,
> > > lastModifiedTime=1617050940118, largestTime=0) (kafka.log.Log)
> > > [2021-03-29 15:49:13,765] INFO [Log
> > > partition=test-stream-store-changelog-4,
> > > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Incremented log
> > start
> > > offset to 1 due to segment deletion (kafka.log.Log)
> > >
> > >
> > >
> > > Does this have anything to do with the *largetTime=0* mentioned in the
> > > log? This was the first and only record written to the store/changelog.
> > Is
> > > there anything else we can try to resolve this issue or give us more
> > > insight into where this issue could originate from?
> > >
> > >
> > >
> > > Thanks,
> > > Upesh
> > >
> > >
> > > Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
> > > <ud...@itrsgroup.com>
> > > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > > <https://www.itrsgroup.com/>
> > >
> > > *From: *Upesh Desai <ud...@itrsgroup.com>
> > > *Date: *Thursday, March 25, 2021 at 6:46 PM
> > > *To: *users@kafka.apache.org <us...@kafka.apache.org>
> > > *Cc: *Bart Lilje <bl...@itrsgroup.com>
> > > *Subject: *Re: Kafka Streams Processor API state stores not restored
> via
> > > changelog topics
> > >
> > > We have not tried running 2.6 brokers and 2.7 client, I will try and
> get
> > > back to you.
> > >
> > >
> > >
> > > We are not enabling EOS on the streams, we have it set to
> AT_LEAST_ONCE.
> > > The shutdowns and restarts of the stream app are clean each time.
> > >
> > >
> > >
> > > I see in the broker logs certain lines indicating that the log segment
> is
> > > being rolled and deleted, but I don’t see how or why this should be
> > > happening when the records were just written. See the log line snippets
> > > included in the attached file. Initially 8 records are added (offsets
> > 0-8),
> > > followed by a single record (offset 9). They are rolled and deleted
> > almost
> > > instantly.
> > >
> > >
> > >
> > > Best,
> > >
> > > Upesh
> > >
> > >
> > >
> > > *Upesh Desai**​*
> > >
> > > * | *
> > >
> > > Senior Software Developer
> > >
> > >  |
> > >
> > > *udesai@itrsgroup.com <ud...@itrsgroup.com>*
> > >
> > > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > >
> > > <https://www.itrsgroup.com/>
> > >
> > > *From: *Guozhang Wang <wa...@gmail.com>
> > > *Date: *Thursday, March 25, 2021 at 6:31 PM
> > > *To: *Users <us...@kafka.apache.org>
> > > *Subject: *Re: Kafka Streams Processor API state stores not restored
> via
> > > changelog topics
> > >
> > > BTW, yes that indicates the record in the changelog was already
> truncated
> > > (logically). But since we only physically truncate logs by segments,
> > which
> > > is 1GB by default, it should still be physically on the log. Are you
> > > enabling EOS on Streams, and when you shutdown the streams app, is
> that a
> > > clean shutdown?
> > >
> > > On Thu, Mar 25, 2021 at 4:22 PM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > That's indeed weird.
> > > >
> > > > Have you tried to run Kafka brokers with 2.6 while Kafka Streams
> client
> > > > with 2.7?
> > > >
> > > > On Thu, Mar 25, 2021 at 2:34 PM Upesh Desai <ud...@itrsgroup.com>
> > > wrote:
> > > >
> > > >> Hello Guozhang,
> > > >>
> > > >>
> > > >>
> > > >> I have tried your suggestions with an inMemoryStore FYI and seen the
> > > >> following:
> > > >>
> > > >>
> > > >>
> > > >>    1. I have the record added to the state store, stopped the
> > > >>    application, and check the earliest and latest offsets via the
> > > command line
> > > >>    tools. This shows that the earliest offset is 1, and the latest
> > > offset is
> > > >>    also 1. Does this mean that the record has been marked for
> deletion
> > > >>    already? My retention.ms config is set to 3 days (259200000 ms),
> > so
> > > >>    it should not be marked for deletion if added a couple minutes
> > prior?
> > > >>    2. Following the above, this makes sense as well. When logging
> the
> > > >>    starting offset, it is not 0, but rather 1:
> > > >>
> > > >>    *topic: streamapp-teststore-changelog, partition: 4, start
> offset:
> > 1,
> > > >>    end offset: 1*
> > > >>
> > > >>
> > > >>
> > > >> I also confirmed different behavior when we change the changelog
> topic
> > > >> cleanup policy from “*compact,delete”* to just “*compact”*. We DO
> NOT
> > > >> see this issue when the changelog is just set to compact. We also
> > > confirmed
> > > >> that this does not happen when we run everything on Kafka version
> 2.6.
> > > >>
> > > >>
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Upesh
> > > >>
> > > >>
> > > >> Upesh Desai​  |  Senior Software Developer  |  *
> udesai@itrsgroup.com*
> > > >> <ud...@itrsgroup.com>
> > > >> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > > >> <https://www.itrsgroup.com/>
> > > >>
> > > >> *From: *Guozhang Wang <wa...@gmail.com>
> > > >> *Date: *Thursday, March 25, 2021 at 4:01 PM
> > > >> *To: *Users <us...@kafka.apache.org>
> > > >> *Cc: *Bart Lilje <bl...@itrsgroup.com>
> > > >> *Subject: *Re: Kafka Streams Processor API state stores not restored
> > via
> > > >> changelog topics
> > > >>
> > > >> Hello Upesh,
> > > >>
> > > >> Could you confirm a few more things for me:
> > > >>
> > > >> 1. After you stopped the application, and wiped out the state dir;
> > check
> > > >> if
> > > >> the corresponding changelog topic has one record indeed at offset 0
> > ---
> > > >> this can be done via the admin#listOffsets (get the earliest and
> > latest
> > > >> offset, which should be 0 and 1 correspondingly).
> > > >> 2. After you resumed the application, check from which starting
> > position
> > > >> we
> > > >> are restoring the changelog --- this can be done via implementing
> the
> > > >> `stateRestoreListener.onRestoreStart(partition, storeName,
> > startOffset,
> > > >> restoreEndOffset);`, should be 0
> > > >>
> > > >> If both of them check out fine as expected, then from the code I
> think
> > > >> bufferedLimitIndex should be updated to 1.
> > > >>
> > > >>
> > > >> Guozhang
> > > >>
> > > >> On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai <ud...@itrsgroup.com>
> > > wrote:
> > > >>
> > > >> > Hi Guozhang,
> > > >> >
> > > >> >
> > > >> >
> > > >> > Here are some of the answers to your questions I see during my
> > > testing:
> > > >> >
> > > >> >
> > > >> >
> > > >> >    1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected
> as
> > in
> > > >> my
> > > >> >    test 1 record had been added to the store. However the
> numRecords
> > > >> variable
> > > >> >    is still set to 0
> > > >> >    2. For that particular test, `hasRestoredToEnd()` indeed
> returns
> > > true
> > > >> >    as well. But it is confusing since the store is actually empty
> /
> > > that
> > > >> >    record I added does not exist in the store when trying to check
> > for
> > > >> it.
> > > >> >    3. N/A
> > > >> >
> > > >> >
> > > >> >
> > > >> > A little more information, the records we add to this
> > store/changelog
> > > >> are
> > > >> > of type <CustomKey,byte[]> where the value is always set to an
> empty
> > > >> byte
> > > >> > array `new byte[0]`. A couple other variations I have tried are
> > > setting
> > > >> to
> > > >> > a non-empty static byte array such as `new byte[1]` or `new
> > > byte[]{1}`.
> > > >> >
> > > >> >
> > > >> >
> > > >> > Hope this gives a little more clarity and hope to hear from you
> > soon.
> > > >> >
> > > >> >
> > > >> >
> > > >> > Thanks,
> > > >> >
> > > >> > Upesh
> > > >> >
> > > >> >
> > > >> > Upesh Desai​  |  Senior Software Developer  |  *
> > udesai@itrsgroup.com*
> > > >> > <ud...@itrsgroup.com>
> > > >> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > > >> > <https://www.itrsgroup.com/>
> > > >> >
> > > >> > *From: *Guozhang Wang <wa...@gmail.com>
> > > >> > *Date: *Wednesday, March 24, 2021 at 1:37 PM
> > > >> > *To: *Users <us...@kafka.apache.org>
> > > >> > *Cc: *Bart Lilje <bl...@itrsgroup.com>
> > > >> > *Subject: *Re: Kafka Streams Processor API state stores not
> restored
> > > via
> > > >> > changelog topics
> > > >> >
> > > >> > Hello Upesh,
> > > >> >
> > > >> > Thanks for the detailed report. I looked through the code and
> tried
> > to
> > > >> > reproduce the issue, but so far have not succeeded. I think I may
> > need
> > > >> some
> > > >> > further information from you to help my further investigation.
> > > >> >
> > > >> > 1) The `bufferedLimitIndex == 0` itself does not necessarily mean
> > > >> there's
> > > >> > an issue, as long as it could still be bumped later (i.e. it is
> > > possible
> > > >> > that the restore consumer has not fetched data yet). What's key
> > > though,
> > > >> is
> > > >> > to check `ChangelogMetadata#restoreEndOffset`: for active tasks,
> it
> > > >> would
> > > >> > be created with null value, and then been initialized once.
> > > >> ChangelogReader
> > > >> > would stop restoring once the current offset has reached beyond
> this
> > > >> value
> > > >> > or if this value itself is 0.
> > > >> >
> > > >> > 2) If `restoreEndOffset` is initialized to a non-zero value, then
> > > check
> > > >> if
> > > >> > the restoration indeed completed without applying any records,
> this
> > is
> > > >> > determined as `hasRestoredToEnd()` returning true.
> > > >> >
> > > >> > 3) If `restoreEndOffset` is initialized to 0, then we need to
> check
> > > >> why: on
> > > >> > top of my head I can only think of that the consumer's end offset
> > > >> request
> > > >> > gets the response with 0, indicating the changelog is now empty.
> > > >> >
> > > >> >
> > > >> > Guozhang
> > > >> >
> > > >> >
> > > >> > On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <udesai@itrsgroup.com
> >
> > > >> wrote:
> > > >> >
> > > >> > > Hi all,
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > Our team think we discovered a bug over the weekend withing the
> > > Kafka
> > > >> > > Streams / Processor API. We are running 2.7.0.
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > When configuring a state store backed by a changelog topic with
> > the
> > > >> > > cleanup policy configuration set to “compact,delete”:
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > final StoreBuilder<KeyValueStore<k,v>> store = Stores
> > > >> > >   .*keyValueStoreBuilder*(
> > > >> > >     Stores.*persistentKeyValueStore*(*STORE_ID*),
> > > >> > >     kSerde,
> > > >> > >     vSerde)
> > > >> > >   .withLoggingEnabled(Map.*of*(
> > > >> > >     *RETENTION_MS_CONFIG*, "90000000"),
> > > >> > >     *CLEANUP_POLICY_CONFIG*, "compact,delete"))
> > > >> > >   .withCachingEnabled();
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > Here is how we reproduced the problem:
> > > >> > >
> > > >> > >    1. Records are written to the state store, and subsequently
> > > >> produced
> > > >> > >    to the changelog topic.
> > > >> > >    2. Store streams application
> > > >> > >    3. Delete state.dir directory
> > > >> > >    4. Restart streams application
> > > >> > >    5. Confirm state store is initialized empty with no records
> > > >> restored
> > > >> > >    from changelog
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > We see this problem with both in-memory and RocksDB backed state
> > > >> stores.
> > > >> > > For persistent state store, if the streams application is
> > restarted
> > > >> > without
> > > >> > > the state dir being deleted, the application still does not
> > > “restore”
> > > >> > from
> > > >> > > the changelog, but records are still seen in the state store.
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > When rolling back to 2.6, we do not see this issue.
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > Doing some debugging in the source code, in the
> > StoreChangelogReader
> > > >> > class
> > > >> > > I found that the number of records to restore is always 0 based
> on
> > > the
> > > >> > > below snippet:
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > private void restoreChangelog(final ChangelogMetadata
> > > >> changelogMetadata)
> > > >> > {
> > > >> > >     final ProcessorStateManager stateManager =
> > > >> > changelogMetadata.stateManager;
> > > >> > >     final StateStoreMetadata storeMetadata =
> > > >> > changelogMetadata.storeMetadata;
> > > >> > >     final TopicPartition partition =
> > > >> storeMetadata.changelogPartition();
> > > >> > >     final String storeName = storeMetadata.store().name();
> > > >> > >     final int numRecords = changelogMetadata.bufferedLimitIndex;
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > My question to you all is, 1) Is this expected behavior? 2) If
> > not,
> > > >> is it
> > > >> > > a bug?
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > Hope to get some clarity, and thanks in advance!
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > Best,
> > > >> > > Upesh
> > > >> > > <https://www.itrsgroup.com/>
> > > >> > > Upesh Desai​
> > > >> > > Senior Software Developer
> > > >> > > *udesai@itrsgroup.com* <ud...@itrsgroup.com>
> > > >> > > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > > >> > > Internet communications are not secure and therefore the ITRS
> > Group
> > > >> does
> > > >> > > not accept legal responsibility for the contents of this
> message.
> > > Any
> > > >> > view
> > > >> > > or opinions presented are solely those of the author and do not
> > > >> > necessarily
> > > >> > > represent those of the ITRS Group unless otherwise specifically
> > > >> stated.
> > > >> > > [itrs.email.signature]
> > > >> > >
> > > >> > >
> > > >> > > *Disclaimer*
> > > >> > >
> > > >> > > The information contained in this communication from the sender
> is
> > > >> > > confidential. It is intended solely for use by the recipient and
> > > >> others
> > > >> > > authorized to receive it. If you are not the recipient, you are
> > > hereby
> > > >> > > notified that any disclosure, copying, distribution or taking
> > action
> > > >> in
> > > >> > > relation of the contents of this information is strictly
> > prohibited
> > > >> and
> > > >> > may
> > > >> > > be unlawful.
> > > >> > >
> > > >> > > This email has been scanned for viruses and malware, and may
> have
> > > been
> > > >> > > automatically archived by *Mimecast Ltd*, an innovator in
> Software
> > > as
> > > >> a
> > > >> > > Service (SaaS) for business. Providing a *safer* and *more
> useful*
> > > >> place
> > > >> > > for your human generated data. Specializing in; Security,
> > archiving
> > > >> and
> > > >> > > compliance.
> > > >> > >
> > > >> >
> > > >> >
> > > >> > --
> > > >> > -- Guozhang
> > > >> >
> > > >> >
> > > >> > *Disclaimer*
> > > >> >
> > > >> > The information contained in this communication from the sender is
> > > >> > confidential. It is intended solely for use by the recipient and
> > > others
> > > >> > authorized to receive it. If you are not the recipient, you are
> > hereby
> > > >> > notified that any disclosure, copying, distribution or taking
> action
> > > in
> > > >> > relation of the contents of this information is strictly
> prohibited
> > > and
> > > >> may
> > > >> > be unlawful.
> > > >> >
> > > >> > This email has been scanned for viruses and malware, and may have
> > been
> > > >> > automatically archived by *Mimecast Ltd*, an innovator in Software
> > as
> > > a
> > > >> > Service (SaaS) for business. Providing a *safer* and *more useful*
> > > place
> > > >> > for your human generated data. Specializing in; Security,
> archiving
> > > and
> > > >> > compliance.
> > > >> >
> > > >>
> > > >>
> > > >> --
> > > >> -- Guozhang
> > > >>
> > > >>
> > > >> *Disclaimer*
> > > >>
> > > >> The information contained in this communication from the sender is
> > > >> confidential. It is intended solely for use by the recipient and
> > others
> > > >> authorized to receive it. If you are not the recipient, you are
> hereby
> > > >> notified that any disclosure, copying, distribution or taking action
> > in
> > > >> relation of the contents of this information is strictly prohibited
> > and
> > > may
> > > >> be unlawful.
> > > >>
> > > >> This email has been scanned for viruses and malware, and may have
> been
> > > >> automatically archived by *Mimecast Ltd*, an innovator in Software
> as
> > a
> > > >> Service (SaaS) for business. Providing a *safer* and *more useful*
> > place
> > > >> for your human generated data. Specializing in; Security, archiving
> > and
> > > >> compliance.
> > > >>
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> > >
> > > *Disclaimer*
> > >
> > > The information contained in this communication from the sender is
> > > confidential. It is intended solely for use by the recipient and others
> > > authorized to receive it. If you are not the recipient, you are hereby
> > > notified that any disclosure, copying, distribution or taking action in
> > > relation of the contents of this information is strictly prohibited and
> > may
> > > be unlawful.
> > >
> > > This email has been scanned for viruses and malware, and may have been
> > > automatically archived by *Mimecast Ltd*, an innovator in Software as a
> > > Service (SaaS) for business. Providing a *safer* and *more useful*
> place
> > > for your human generated data. Specializing in; Security, archiving and
> > > compliance.
> > >
> >
> >
> > --
> > -- Guozhang
> >
> >
> > *Disclaimer*
> >
> > The information contained in this communication from the sender is
> > confidential. It is intended solely for use by the recipient and others
> > authorized to receive it. If you are not the recipient, you are hereby
> > notified that any disclosure, copying, distribution or taking action in
> > relation of the contents of this information is strictly prohibited and
> may
> > be unlawful.
> >
> > This email has been scanned for viruses and malware, and may have been
> > automatically archived by *Mimecast Ltd*, an innovator in Software as a
> > Service (SaaS) for business. Providing a *safer* and *more useful* place
> > for your human generated data. Specializing in; Security, archiving and
> > compliance.
> >
>
>
> --
> -- Guozhang
>
>
> *Disclaimer*
>
> The information contained in this communication from the sender is
> confidential. It is intended solely for use by the recipient and others
> authorized to receive it. If you are not the recipient, you are hereby
> notified that any disclosure, copying, distribution or taking action in
> relation of the contents of this information is strictly prohibited and may
> be unlawful.
>
> This email has been scanned for viruses and malware, and may have been
> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> Service (SaaS) for business. Providing a *safer* and *more useful* place
> for your human generated data. Specializing in; Security, archiving and
> compliance.
>


-- 
-- Guozhang

Re: Kafka Streams Processor API state stores not restored via changelog topics

Posted by Upesh Desai <ud...@itrsgroup.com>.
Hi Guozhang,

We can confirm the behavior with the 2.7.1 release. Appreciate all the help!

Cheers,
Upesh


Upesh Desai | Senior Software Developer | udesai@itrsgroup.com
www.itrsgroup.com
From: Guozhang Wang <wa...@gmail.com>
Date: Tuesday, March 30, 2021 at 2:10 PM
To: Users <us...@kafka.apache.org>
Cc: Bart Lilje <bl...@itrsgroup.com>
Subject: Re: Kafka Streams Processor API state stores not restored via changelog topics
Great, I think https://issues.apache.org/jira/browse/KAFKA-12323 is indeed
the root cause then. Note that this is only an issue with punctuation
triggered events, where `context.timestamp()` would return 0 (and it is
fixed in the yet-to-release 2.7.1/2.8.0).

You can consider applying the patch if you could on top of 2.7.0, or wait
for the new release; OR, if your production code does not actually use
punctuation to write records to Kafka, then this issue would not actually
impact you.


Guozhang

On Tue, Mar 30, 2021 at 11:56 AM Upesh Desai <ud...@itrsgroup.com> wrote:

> Hi Guozhang,
>
>
>
> Great to hear we might have found the issue!
>
>
>
> To answer your question, the changelog record is generated by us calling
> ‘store.put(key,value)’ from the punctuate callback, which makes sense then
> because the timestamp would be 0 like you saw in your test as well.
>
>
>
> Best,
>
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
> <ud...@itrsgroup.com>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang <wa...@gmail.com>
> *Date: *Tuesday, March 30, 2021 at 1:00 PM
> *To: *Users <us...@kafka.apache.org>
> *Cc: *Bart Lilje <bl...@itrsgroup.com>
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> Hello Upesh,
>
> These are super helpful logs, and I think I'm very close to the root cause
> of it. You see, the written changelog record's timestamp is set to 0
> (i.e. January 1st 1970 at midnight GMT), and hence given a reasonable Kafka
> server start time (presumingly in 21st century), the retention time would
> always be breached, and causing the log deletion mechanism to trigger.
>
> The timestamp is set with `context.timestamp()` which would use the
> processing record's timestamp; but myself have seen and fixed a bug (
> https://issues.apache.org/jira/browse/KAFKA-12323) where the timestamp was
> not populated and hence set to 0 if was generated as part of a punctuation.
> So my next key question is: is this changelog record generated, i.e. its
> put call triggered, from processing an input record, or from a punctuation
> call?
>
>
> Guozhang
>
> On Mon, Mar 29, 2021 at 2:01 PM Upesh Desai <ud...@itrsgroup.com> wrote:
>
> > Hi Guozhang,
> >
> >
> >
> > When testing with a 2.6.1 broker and 2.7 streams application, I see the
> > same behavior as described before with the 2.7 broker where just after a
> > record is written to the changelog topic, the log segment is rolled and
> > deleted citing that the retention time has passed (the record was written
> > to the state store at ~15:49:
> >
> >
> >
> > [2021-03-29 15:49:13,757] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Found deletable
> > segments with base offsets [0] due to retention time 259200000ms breach
> > (kafka.log.Log)
> > [2021-03-29 15:49:13,761] INFO [ProducerStateManager
> > partition=test-stream-store-changelog-4] Writing producer snapshot at
> > offset 1 (kafka.log.ProducerStateManager)
> > [2021-03-29 15:49:13,763] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Rolled new log
> > segment at offset 1 in 5 ms. (kafka.log.Log)
> > [2021-03-29 15:49:13,764] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Scheduling segments
> > for deletion LogSegment(baseOffset=0, size=156,
> > lastModifiedTime=1617050940118, largestTime=0) (kafka.log.Log)
> > [2021-03-29 15:49:13,765] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Incremented log
> start
> > offset to 1 due to segment deletion (kafka.log.Log)
> >
> >
> >
> > Does this have anything to do with the *largetTime=0* mentioned in the
> > log? This was the first and only record written to the store/changelog.
> Is
> > there anything else we can try to resolve this issue or give us more
> > insight into where this issue could originate from?
> >
> >
> >
> > Thanks,
> > Upesh
> >
> >
> > Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
> > <ud...@itrsgroup.com>
> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > <https://www.itrsgroup.com/>
> >
> > *From: *Upesh Desai <ud...@itrsgroup.com>
> > *Date: *Thursday, March 25, 2021 at 6:46 PM
> > *To: *users@kafka.apache.org <us...@kafka.apache.org>
> > *Cc: *Bart Lilje <bl...@itrsgroup.com>
> > *Subject: *Re: Kafka Streams Processor API state stores not restored via
> > changelog topics
> >
> > We have not tried running 2.6 brokers and 2.7 client, I will try and get
> > back to you.
> >
> >
> >
> > We are not enabling EOS on the streams, we have it set to AT_LEAST_ONCE.
> > The shutdowns and restarts of the stream app are clean each time.
> >
> >
> >
> > I see in the broker logs certain lines indicating that the log segment is
> > being rolled and deleted, but I don’t see how or why this should be
> > happening when the records were just written. See the log line snippets
> > included in the attached file. Initially 8 records are added (offsets
> 0-8),
> > followed by a single record (offset 9). They are rolled and deleted
> almost
> > instantly.
> >
> >
> >
> > Best,
> >
> > Upesh
> >
> >
> >
> > *Upesh Desai**​*
> >
> > * | *
> >
> > Senior Software Developer
> >
> >  |
> >
> > *udesai@itrsgroup.com <ud...@itrsgroup.com>*
> >
> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> >
> > <https://www.itrsgroup.com/>
> >
> > *From: *Guozhang Wang <wa...@gmail.com>
> > *Date: *Thursday, March 25, 2021 at 6:31 PM
> > *To: *Users <us...@kafka.apache.org>
> > *Subject: *Re: Kafka Streams Processor API state stores not restored via
> > changelog topics
> >
> > BTW, yes that indicates the record in the changelog was already truncated
> > (logically). But since we only physically truncate logs by segments,
> which
> > is 1GB by default, it should still be physically on the log. Are you
> > enabling EOS on Streams, and when you shutdown the streams app, is that a
> > clean shutdown?
> >
> > On Thu, Mar 25, 2021 at 4:22 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > That's indeed weird.
> > >
> > > Have you tried to run Kafka brokers with 2.6 while Kafka Streams client
> > > with 2.7?
> > >
> > > On Thu, Mar 25, 2021 at 2:34 PM Upesh Desai <ud...@itrsgroup.com>
> > wrote:
> > >
> > >> Hello Guozhang,
> > >>
> > >>
> > >>
> > >> I have tried your suggestions with an inMemoryStore FYI and seen the
> > >> following:
> > >>
> > >>
> > >>
> > >>    1. I have the record added to the state store, stopped the
> > >>    application, and check the earliest and latest offsets via the
> > command line
> > >>    tools. This shows that the earliest offset is 1, and the latest
> > offset is
> > >>    also 1. Does this mean that the record has been marked for deletion
> > >>    already? My retention.ms config is set to 3 days (259200000 ms),
> so
> > >>    it should not be marked for deletion if added a couple minutes
> prior?
> > >>    2. Following the above, this makes sense as well. When logging the
> > >>    starting offset, it is not 0, but rather 1:
> > >>
> > >>    *topic: streamapp-teststore-changelog, partition: 4, start offset:
> 1,
> > >>    end offset: 1*
> > >>
> > >>
> > >>
> > >> I also confirmed different behavior when we change the changelog topic
> > >> cleanup policy from “*compact,delete”* to just “*compact”*. We DO NOT
> > >> see this issue when the changelog is just set to compact. We also
> > confirmed
> > >> that this does not happen when we run everything on Kafka version 2.6.
> > >>
> > >>
> > >>
> > >> Thanks,
> > >>
> > >> Upesh
> > >>
> > >>
> > >> Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
> > >> <ud...@itrsgroup.com>
> > >> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > >> <https://www.itrsgroup.com/>
> > >>
> > >> *From: *Guozhang Wang <wa...@gmail.com>
> > >> *Date: *Thursday, March 25, 2021 at 4:01 PM
> > >> *To: *Users <us...@kafka.apache.org>
> > >> *Cc: *Bart Lilje <bl...@itrsgroup.com>
> > >> *Subject: *Re: Kafka Streams Processor API state stores not restored
> via
> > >> changelog topics
> > >>
> > >> Hello Upesh,
> > >>
> > >> Could you confirm a few more things for me:
> > >>
> > >> 1. After you stopped the application, and wiped out the state dir;
> check
> > >> if
> > >> the corresponding changelog topic has one record indeed at offset 0
> ---
> > >> this can be done via the admin#listOffsets (get the earliest and
> latest
> > >> offset, which should be 0 and 1 correspondingly).
> > >> 2. After you resumed the application, check from which starting
> position
> > >> we
> > >> are restoring the changelog --- this can be done via implementing the
> > >> `stateRestoreListener.onRestoreStart(partition, storeName,
> startOffset,
> > >> restoreEndOffset);`, should be 0
> > >>
> > >> If both of them check out fine as expected, then from the code I think
> > >> bufferedLimitIndex should be updated to 1.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >> On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai <ud...@itrsgroup.com>
> > wrote:
> > >>
> > >> > Hi Guozhang,
> > >> >
> > >> >
> > >> >
> > >> > Here are some of the answers to your questions I see during my
> > testing:
> > >> >
> > >> >
> > >> >
> > >> >    1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as
> in
> > >> my
> > >> >    test 1 record had been added to the store. However the numRecords
> > >> variable
> > >> >    is still set to 0
> > >> >    2. For that particular test, `hasRestoredToEnd()` indeed returns
> > true
> > >> >    as well. But it is confusing since the store is actually empty /
> > that
> > >> >    record I added does not exist in the store when trying to check
> for
> > >> it.
> > >> >    3. N/A
> > >> >
> > >> >
> > >> >
> > >> > A little more information, the records we add to this
> store/changelog
> > >> are
> > >> > of type <CustomKey,byte[]> where the value is always set to an empty
> > >> byte
> > >> > array `new byte[0]`. A couple other variations I have tried are
> > setting
> > >> to
> > >> > a non-empty static byte array such as `new byte[1]` or `new
> > byte[]{1}`.
> > >> >
> > >> >
> > >> >
> > >> > Hope this gives a little more clarity and hope to hear from you
> soon.
> > >> >
> > >> >
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Upesh
> > >> >
> > >> >
> > >> > Upesh Desai​  |  Senior Software Developer  |  *
> udesai@itrsgroup.com*
> > >> > <ud...@itrsgroup.com>
> > >> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > >> > <https://www.itrsgroup.com/>
> > >> >
> > >> > *From: *Guozhang Wang <wa...@gmail.com>
> > >> > *Date: *Wednesday, March 24, 2021 at 1:37 PM
> > >> > *To: *Users <us...@kafka.apache.org>
> > >> > *Cc: *Bart Lilje <bl...@itrsgroup.com>
> > >> > *Subject: *Re: Kafka Streams Processor API state stores not restored
> > via
> > >> > changelog topics
> > >> >
> > >> > Hello Upesh,
> > >> >
> > >> > Thanks for the detailed report. I looked through the code and tried
> to
> > >> > reproduce the issue, but so far have not succeeded. I think I may
> need
> > >> some
> > >> > further information from you to help my further investigation.
> > >> >
> > >> > 1) The `bufferedLimitIndex == 0` itself does not necessarily mean
> > >> there's
> > >> > an issue, as long as it could still be bumped later (i.e. it is
> > possible
> > >> > that the restore consumer has not fetched data yet). What's key
> > though,
> > >> is
> > >> > to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it
> > >> would
> > >> > be created with null value, and then been initialized once.
> > >> ChangelogReader
> > >> > would stop restoring once the current offset has reached beyond this
> > >> value
> > >> > or if this value itself is 0.
> > >> >
> > >> > 2) If `restoreEndOffset` is initialized to a non-zero value, then
> > check
> > >> if
> > >> > the restoration indeed completed without applying any records, this
> is
> > >> > determined as `hasRestoredToEnd()` returning true.
> > >> >
> > >> > 3) If `restoreEndOffset` is initialized to 0, then we need to check
> > >> why: on
> > >> > top of my head I can only think of that the consumer's end offset
> > >> request
> > >> > gets the response with 0, indicating the changelog is now empty.
> > >> >
> > >> >
> > >> > Guozhang
> > >> >
> > >> >
> > >> > On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <ud...@itrsgroup.com>
> > >> wrote:
> > >> >
> > >> > > Hi all,
> > >> > >
> > >> > >
> > >> > >
> > >> > > Our team think we discovered a bug over the weekend withing the
> > Kafka
> > >> > > Streams / Processor API. We are running 2.7.0.
> > >> > >
> > >> > >
> > >> > >
> > >> > > When configuring a state store backed by a changelog topic with
> the
> > >> > > cleanup policy configuration set to “compact,delete”:
> > >> > >
> > >> > >
> > >> > >
> > >> > > final StoreBuilder<KeyValueStore<k,v>> store = Stores
> > >> > >   .*keyValueStoreBuilder*(
> > >> > >     Stores.*persistentKeyValueStore*(*STORE_ID*),
> > >> > >     kSerde,
> > >> > >     vSerde)
> > >> > >   .withLoggingEnabled(Map.*of*(
> > >> > >     *RETENTION_MS_CONFIG*, "90000000"),
> > >> > >     *CLEANUP_POLICY_CONFIG*, "compact,delete"))
> > >> > >   .withCachingEnabled();
> > >> > >
> > >> > >
> > >> > >
> > >> > > Here is how we reproduced the problem:
> > >> > >
> > >> > >    1. Records are written to the state store, and subsequently
> > >> produced
> > >> > >    to the changelog topic.
> > >> > >    2. Store streams application
> > >> > >    3. Delete state.dir directory
> > >> > >    4. Restart streams application
> > >> > >    5. Confirm state store is initialized empty with no records
> > >> restored
> > >> > >    from changelog
> > >> > >
> > >> > >
> > >> > >
> > >> > > We see this problem with both in-memory and RocksDB backed state
> > >> stores.
> > >> > > For persistent state store, if the streams application is
> restarted
> > >> > without
> > >> > > the state dir being deleted, the application still does not
> > “restore”
> > >> > from
> > >> > > the changelog, but records are still seen in the state store.
> > >> > >
> > >> > >
> > >> > >
> > >> > > When rolling back to 2.6, we do not see this issue.
> > >> > >
> > >> > >
> > >> > >
> > >> > > Doing some debugging in the source code, in the
> StoreChangelogReader
> > >> > class
> > >> > > I found that the number of records to restore is always 0 based on
> > the
> > >> > > below snippet:
> > >> > >
> > >> > >
> > >> > >
> > >> > > private void restoreChangelog(final ChangelogMetadata
> > >> changelogMetadata)
> > >> > {
> > >> > >     final ProcessorStateManager stateManager =
> > >> > changelogMetadata.stateManager;
> > >> > >     final StateStoreMetadata storeMetadata =
> > >> > changelogMetadata.storeMetadata;
> > >> > >     final TopicPartition partition =
> > >> storeMetadata.changelogPartition();
> > >> > >     final String storeName = storeMetadata.store().name();
> > >> > >     final int numRecords = changelogMetadata.bufferedLimitIndex;
> > >> > >
> > >> > >
> > >> > >
> > >> > > Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
> > >> > >
> > >> > >
> > >> > >
> > >> > > My question to you all is, 1) Is this expected behavior? 2) If
> not,
> > >> is it
> > >> > > a bug?
> > >> > >
> > >> > >
> > >> > >
> > >> > > Hope to get some clarity, and thanks in advance!
> > >> > >
> > >> > >
> > >> > >
> > >> > > Best,
> > >> > > Upesh
> > >> > > <https://www.itrsgroup.com/>
> > >> > > Upesh Desai​
> > >> > > Senior Software Developer
> > >> > > *udesai@itrsgroup.com* <ud...@itrsgroup.com>
> > >> > > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > >> > > Internet communications are not secure and therefore the ITRS
> Group
> > >> does
> > >> > > not accept legal responsibility for the contents of this message.
> > Any
> > >> > view
> > >> > > or opinions presented are solely those of the author and do not
> > >> > necessarily
> > >> > > represent those of the ITRS Group unless otherwise specifically
> > >> stated.
> > >> > > [itrs.email.signature]
> > >> > >
> > >> > >
> > >> > > *Disclaimer*
> > >> > >
> > >> > > The information contained in this communication from the sender is
> > >> > > confidential. It is intended solely for use by the recipient and
> > >> others
> > >> > > authorized to receive it. If you are not the recipient, you are
> > hereby
> > >> > > notified that any disclosure, copying, distribution or taking
> action
> > >> in
> > >> > > relation of the contents of this information is strictly
> prohibited
> > >> and
> > >> > may
> > >> > > be unlawful.
> > >> > >
> > >> > > This email has been scanned for viruses and malware, and may have
> > been
> > >> > > automatically archived by *Mimecast Ltd*, an innovator in Software
> > as
> > >> a
> > >> > > Service (SaaS) for business. Providing a *safer* and *more useful*
> > >> place
> > >> > > for your human generated data. Specializing in; Security,
> archiving
> > >> and
> > >> > > compliance.
> > >> > >
> > >> >
> > >> >
> > >> > --
> > >> > -- Guozhang
> > >> >
> > >> >
> > >> > *Disclaimer*
> > >> >
> > >> > The information contained in this communication from the sender is
> > >> > confidential. It is intended solely for use by the recipient and
> > others
> > >> > authorized to receive it. If you are not the recipient, you are
> hereby
> > >> > notified that any disclosure, copying, distribution or taking action
> > in
> > >> > relation of the contents of this information is strictly prohibited
> > and
> > >> may
> > >> > be unlawful.
> > >> >
> > >> > This email has been scanned for viruses and malware, and may have
> been
> > >> > automatically archived by *Mimecast Ltd*, an innovator in Software
> as
> > a
> > >> > Service (SaaS) for business. Providing a *safer* and *more useful*
> > place
> > >> > for your human generated data. Specializing in; Security, archiving
> > and
> > >> > compliance.
> > >> >
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >>
> > >> *Disclaimer*
> > >>
> > >> The information contained in this communication from the sender is
> > >> confidential. It is intended solely for use by the recipient and
> others
> > >> authorized to receive it. If you are not the recipient, you are hereby
> > >> notified that any disclosure, copying, distribution or taking action
> in
> > >> relation of the contents of this information is strictly prohibited
> and
> > may
> > >> be unlawful.
> > >>
> > >> This email has been scanned for viruses and malware, and may have been
> > >> automatically archived by *Mimecast Ltd*, an innovator in Software as
> a
> > >> Service (SaaS) for business. Providing a *safer* and *more useful*
> place
> > >> for your human generated data. Specializing in; Security, archiving
> and
> > >> compliance.
> > >>
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> > --
> > -- Guozhang
> >
> >
> > *Disclaimer*
> >
> > The information contained in this communication from the sender is
> > confidential. It is intended solely for use by the recipient and others
> > authorized to receive it. If you are not the recipient, you are hereby
> > notified that any disclosure, copying, distribution or taking action in
> > relation of the contents of this information is strictly prohibited and
> may
> > be unlawful.
> >
> > This email has been scanned for viruses and malware, and may have been
> > automatically archived by *Mimecast Ltd*, an innovator in Software as a
> > Service (SaaS) for business. Providing a *safer* and *more useful* place
> > for your human generated data. Specializing in; Security, archiving and
> > compliance.
> >
>
>
> --
> -- Guozhang
>
>
> *Disclaimer*
>
> The information contained in this communication from the sender is
> confidential. It is intended solely for use by the recipient and others
> authorized to receive it. If you are not the recipient, you are hereby
> notified that any disclosure, copying, distribution or taking action in
> relation of the contents of this information is strictly prohibited and may
> be unlawful.
>
> This email has been scanned for viruses and malware, and may have been
> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> Service (SaaS) for business. Providing a *safer* and *more useful* place
> for your human generated data. Specializing in; Security, archiving and
> compliance.
>


--
-- Guozhang

Disclaimer

The information contained in this communication from the sender is confidential. It is intended solely for use by the recipient and others authorized to receive it. If you are not the recipient, you are hereby notified that any disclosure, copying, distribution or taking action in relation of the contents of this information is strictly prohibited and may be unlawful.

This email has been scanned for viruses and malware, and may have been automatically archived by Mimecast Ltd, an innovator in Software as a Service (SaaS) for business. Providing a safer and more useful place for your human generated data. Specializing in; Security, archiving and compliance. To find out more visit the Mimecast website.

Re: Kafka Streams Processor API state stores not restored via changelog topics

Posted by Guozhang Wang <wa...@gmail.com>.
Great, I think https://issues.apache.org/jira/browse/KAFKA-12323 is indeed
the root cause then. Note that this is only an issue with punctuation
triggered events, where `context.timestamp()` would return 0 (and it is
fixed in the yet-to-release 2.7.1/2.8.0).

You can consider applying the patch if you could on top of 2.7.0, or wait
for the new release; OR, if your production code does not actually use
punctuation to write records to Kafka, then this issue would not actually
impact you.


Guozhang

On Tue, Mar 30, 2021 at 11:56 AM Upesh Desai <ud...@itrsgroup.com> wrote:

> Hi Guozhang,
>
>
>
> Great to hear we might have found the issue!
>
>
>
> To answer your question, the changelog record is generated by us calling
> ‘store.put(key,value)’ from the punctuate callback, which makes sense then
> because the timestamp would be 0 like you saw in your test as well.
>
>
>
> Best,
>
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
> <ud...@itrsgroup.com>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang <wa...@gmail.com>
> *Date: *Tuesday, March 30, 2021 at 1:00 PM
> *To: *Users <us...@kafka.apache.org>
> *Cc: *Bart Lilje <bl...@itrsgroup.com>
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> Hello Upesh,
>
> These are super helpful logs, and I think I'm very close to the root cause
> of it. You see, the written changelog record's timestamp is set to 0
> (i.e. January 1st 1970 at midnight GMT), and hence given a reasonable Kafka
> server start time (presumingly in 21st century), the retention time would
> always be breached, and causing the log deletion mechanism to trigger.
>
> The timestamp is set with `context.timestamp()` which would use the
> processing record's timestamp; but myself have seen and fixed a bug (
> https://issues.apache.org/jira/browse/KAFKA-12323) where the timestamp was
> not populated and hence set to 0 if was generated as part of a punctuation.
> So my next key question is: is this changelog record generated, i.e. its
> put call triggered, from processing an input record, or from a punctuation
> call?
>
>
> Guozhang
>
> On Mon, Mar 29, 2021 at 2:01 PM Upesh Desai <ud...@itrsgroup.com> wrote:
>
> > Hi Guozhang,
> >
> >
> >
> > When testing with a 2.6.1 broker and 2.7 streams application, I see the
> > same behavior as described before with the 2.7 broker where just after a
> > record is written to the changelog topic, the log segment is rolled and
> > deleted citing that the retention time has passed (the record was written
> > to the state store at ~15:49:
> >
> >
> >
> > [2021-03-29 15:49:13,757] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Found deletable
> > segments with base offsets [0] due to retention time 259200000ms breach
> > (kafka.log.Log)
> > [2021-03-29 15:49:13,761] INFO [ProducerStateManager
> > partition=test-stream-store-changelog-4] Writing producer snapshot at
> > offset 1 (kafka.log.ProducerStateManager)
> > [2021-03-29 15:49:13,763] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Rolled new log
> > segment at offset 1 in 5 ms. (kafka.log.Log)
> > [2021-03-29 15:49:13,764] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Scheduling segments
> > for deletion LogSegment(baseOffset=0, size=156,
> > lastModifiedTime=1617050940118, largestTime=0) (kafka.log.Log)
> > [2021-03-29 15:49:13,765] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Incremented log
> start
> > offset to 1 due to segment deletion (kafka.log.Log)
> >
> >
> >
> > Does this have anything to do with the *largetTime=0* mentioned in the
> > log? This was the first and only record written to the store/changelog.
> Is
> > there anything else we can try to resolve this issue or give us more
> > insight into where this issue could originate from?
> >
> >
> >
> > Thanks,
> > Upesh
> >
> >
> > Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
> > <ud...@itrsgroup.com>
> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > <https://www.itrsgroup.com/>
> >
> > *From: *Upesh Desai <ud...@itrsgroup.com>
> > *Date: *Thursday, March 25, 2021 at 6:46 PM
> > *To: *users@kafka.apache.org <us...@kafka.apache.org>
> > *Cc: *Bart Lilje <bl...@itrsgroup.com>
> > *Subject: *Re: Kafka Streams Processor API state stores not restored via
> > changelog topics
> >
> > We have not tried running 2.6 brokers and 2.7 client, I will try and get
> > back to you.
> >
> >
> >
> > We are not enabling EOS on the streams, we have it set to AT_LEAST_ONCE.
> > The shutdowns and restarts of the stream app are clean each time.
> >
> >
> >
> > I see in the broker logs certain lines indicating that the log segment is
> > being rolled and deleted, but I don’t see how or why this should be
> > happening when the records were just written. See the log line snippets
> > included in the attached file. Initially 8 records are added (offsets
> 0-8),
> > followed by a single record (offset 9). They are rolled and deleted
> almost
> > instantly.
> >
> >
> >
> > Best,
> >
> > Upesh
> >
> >
> >
> > *Upesh Desai**​*
> >
> > * | *
> >
> > Senior Software Developer
> >
> >  |
> >
> > *udesai@itrsgroup.com <ud...@itrsgroup.com>*
> >
> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> >
> > <https://www.itrsgroup.com/>
> >
> > *From: *Guozhang Wang <wa...@gmail.com>
> > *Date: *Thursday, March 25, 2021 at 6:31 PM
> > *To: *Users <us...@kafka.apache.org>
> > *Subject: *Re: Kafka Streams Processor API state stores not restored via
> > changelog topics
> >
> > BTW, yes that indicates the record in the changelog was already truncated
> > (logically). But since we only physically truncate logs by segments,
> which
> > is 1GB by default, it should still be physically on the log. Are you
> > enabling EOS on Streams, and when you shutdown the streams app, is that a
> > clean shutdown?
> >
> > On Thu, Mar 25, 2021 at 4:22 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > That's indeed weird.
> > >
> > > Have you tried to run Kafka brokers with 2.6 while Kafka Streams client
> > > with 2.7?
> > >
> > > On Thu, Mar 25, 2021 at 2:34 PM Upesh Desai <ud...@itrsgroup.com>
> > wrote:
> > >
> > >> Hello Guozhang,
> > >>
> > >>
> > >>
> > >> I have tried your suggestions with an inMemoryStore FYI and seen the
> > >> following:
> > >>
> > >>
> > >>
> > >>    1. I have the record added to the state store, stopped the
> > >>    application, and check the earliest and latest offsets via the
> > command line
> > >>    tools. This shows that the earliest offset is 1, and the latest
> > offset is
> > >>    also 1. Does this mean that the record has been marked for deletion
> > >>    already? My retention.ms config is set to 3 days (259200000 ms),
> so
> > >>    it should not be marked for deletion if added a couple minutes
> prior?
> > >>    2. Following the above, this makes sense as well. When logging the
> > >>    starting offset, it is not 0, but rather 1:
> > >>
> > >>    *topic: streamapp-teststore-changelog, partition: 4, start offset:
> 1,
> > >>    end offset: 1*
> > >>
> > >>
> > >>
> > >> I also confirmed different behavior when we change the changelog topic
> > >> cleanup policy from “*compact,delete”* to just “*compact”*. We DO NOT
> > >> see this issue when the changelog is just set to compact. We also
> > confirmed
> > >> that this does not happen when we run everything on Kafka version 2.6.
> > >>
> > >>
> > >>
> > >> Thanks,
> > >>
> > >> Upesh
> > >>
> > >>
> > >> Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
> > >> <ud...@itrsgroup.com>
> > >> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > >> <https://www.itrsgroup.com/>
> > >>
> > >> *From: *Guozhang Wang <wa...@gmail.com>
> > >> *Date: *Thursday, March 25, 2021 at 4:01 PM
> > >> *To: *Users <us...@kafka.apache.org>
> > >> *Cc: *Bart Lilje <bl...@itrsgroup.com>
> > >> *Subject: *Re: Kafka Streams Processor API state stores not restored
> via
> > >> changelog topics
> > >>
> > >> Hello Upesh,
> > >>
> > >> Could you confirm a few more things for me:
> > >>
> > >> 1. After you stopped the application, and wiped out the state dir;
> check
> > >> if
> > >> the corresponding changelog topic has one record indeed at offset 0
> ---
> > >> this can be done via the admin#listOffsets (get the earliest and
> latest
> > >> offset, which should be 0 and 1 correspondingly).
> > >> 2. After you resumed the application, check from which starting
> position
> > >> we
> > >> are restoring the changelog --- this can be done via implementing the
> > >> `stateRestoreListener.onRestoreStart(partition, storeName,
> startOffset,
> > >> restoreEndOffset);`, should be 0
> > >>
> > >> If both of them check out fine as expected, then from the code I think
> > >> bufferedLimitIndex should be updated to 1.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >> On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai <ud...@itrsgroup.com>
> > wrote:
> > >>
> > >> > Hi Guozhang,
> > >> >
> > >> >
> > >> >
> > >> > Here are some of the answers to your questions I see during my
> > testing:
> > >> >
> > >> >
> > >> >
> > >> >    1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as
> in
> > >> my
> > >> >    test 1 record had been added to the store. However the numRecords
> > >> variable
> > >> >    is still set to 0
> > >> >    2. For that particular test, `hasRestoredToEnd()` indeed returns
> > true
> > >> >    as well. But it is confusing since the store is actually empty /
> > that
> > >> >    record I added does not exist in the store when trying to check
> for
> > >> it.
> > >> >    3. N/A
> > >> >
> > >> >
> > >> >
> > >> > A little more information, the records we add to this
> store/changelog
> > >> are
> > >> > of type <CustomKey,byte[]> where the value is always set to an empty
> > >> byte
> > >> > array `new byte[0]`. A couple other variations I have tried are
> > setting
> > >> to
> > >> > a non-empty static byte array such as `new byte[1]` or `new
> > byte[]{1}`.
> > >> >
> > >> >
> > >> >
> > >> > Hope this gives a little more clarity and hope to hear from you
> soon.
> > >> >
> > >> >
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Upesh
> > >> >
> > >> >
> > >> > Upesh Desai​  |  Senior Software Developer  |  *
> udesai@itrsgroup.com*
> > >> > <ud...@itrsgroup.com>
> > >> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > >> > <https://www.itrsgroup.com/>
> > >> >
> > >> > *From: *Guozhang Wang <wa...@gmail.com>
> > >> > *Date: *Wednesday, March 24, 2021 at 1:37 PM
> > >> > *To: *Users <us...@kafka.apache.org>
> > >> > *Cc: *Bart Lilje <bl...@itrsgroup.com>
> > >> > *Subject: *Re: Kafka Streams Processor API state stores not restored
> > via
> > >> > changelog topics
> > >> >
> > >> > Hello Upesh,
> > >> >
> > >> > Thanks for the detailed report. I looked through the code and tried
> to
> > >> > reproduce the issue, but so far have not succeeded. I think I may
> need
> > >> some
> > >> > further information from you to help my further investigation.
> > >> >
> > >> > 1) The `bufferedLimitIndex == 0` itself does not necessarily mean
> > >> there's
> > >> > an issue, as long as it could still be bumped later (i.e. it is
> > possible
> > >> > that the restore consumer has not fetched data yet). What's key
> > though,
> > >> is
> > >> > to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it
> > >> would
> > >> > be created with null value, and then been initialized once.
> > >> ChangelogReader
> > >> > would stop restoring once the current offset has reached beyond this
> > >> value
> > >> > or if this value itself is 0.
> > >> >
> > >> > 2) If `restoreEndOffset` is initialized to a non-zero value, then
> > check
> > >> if
> > >> > the restoration indeed completed without applying any records, this
> is
> > >> > determined as `hasRestoredToEnd()` returning true.
> > >> >
> > >> > 3) If `restoreEndOffset` is initialized to 0, then we need to check
> > >> why: on
> > >> > top of my head I can only think of that the consumer's end offset
> > >> request
> > >> > gets the response with 0, indicating the changelog is now empty.
> > >> >
> > >> >
> > >> > Guozhang
> > >> >
> > >> >
> > >> > On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <ud...@itrsgroup.com>
> > >> wrote:
> > >> >
> > >> > > Hi all,
> > >> > >
> > >> > >
> > >> > >
> > >> > > Our team think we discovered a bug over the weekend withing the
> > Kafka
> > >> > > Streams / Processor API. We are running 2.7.0.
> > >> > >
> > >> > >
> > >> > >
> > >> > > When configuring a state store backed by a changelog topic with
> the
> > >> > > cleanup policy configuration set to “compact,delete”:
> > >> > >
> > >> > >
> > >> > >
> > >> > > final StoreBuilder<KeyValueStore<k,v>> store = Stores
> > >> > >   .*keyValueStoreBuilder*(
> > >> > >     Stores.*persistentKeyValueStore*(*STORE_ID*),
> > >> > >     kSerde,
> > >> > >     vSerde)
> > >> > >   .withLoggingEnabled(Map.*of*(
> > >> > >     *RETENTION_MS_CONFIG*, "90000000"),
> > >> > >     *CLEANUP_POLICY_CONFIG*, "compact,delete"))
> > >> > >   .withCachingEnabled();
> > >> > >
> > >> > >
> > >> > >
> > >> > > Here is how we reproduced the problem:
> > >> > >
> > >> > >    1. Records are written to the state store, and subsequently
> > >> produced
> > >> > >    to the changelog topic.
> > >> > >    2. Store streams application
> > >> > >    3. Delete state.dir directory
> > >> > >    4. Restart streams application
> > >> > >    5. Confirm state store is initialized empty with no records
> > >> restored
> > >> > >    from changelog
> > >> > >
> > >> > >
> > >> > >
> > >> > > We see this problem with both in-memory and RocksDB backed state
> > >> stores.
> > >> > > For persistent state store, if the streams application is
> restarted
> > >> > without
> > >> > > the state dir being deleted, the application still does not
> > “restore”
> > >> > from
> > >> > > the changelog, but records are still seen in the state store.
> > >> > >
> > >> > >
> > >> > >
> > >> > > When rolling back to 2.6, we do not see this issue.
> > >> > >
> > >> > >
> > >> > >
> > >> > > Doing some debugging in the source code, in the
> StoreChangelogReader
> > >> > class
> > >> > > I found that the number of records to restore is always 0 based on
> > the
> > >> > > below snippet:
> > >> > >
> > >> > >
> > >> > >
> > >> > > private void restoreChangelog(final ChangelogMetadata
> > >> changelogMetadata)
> > >> > {
> > >> > >     final ProcessorStateManager stateManager =
> > >> > changelogMetadata.stateManager;
> > >> > >     final StateStoreMetadata storeMetadata =
> > >> > changelogMetadata.storeMetadata;
> > >> > >     final TopicPartition partition =
> > >> storeMetadata.changelogPartition();
> > >> > >     final String storeName = storeMetadata.store().name();
> > >> > >     final int numRecords = changelogMetadata.bufferedLimitIndex;
> > >> > >
> > >> > >
> > >> > >
> > >> > > Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
> > >> > >
> > >> > >
> > >> > >
> > >> > > My question to you all is, 1) Is this expected behavior? 2) If
> not,
> > >> is it
> > >> > > a bug?
> > >> > >
> > >> > >
> > >> > >
> > >> > > Hope to get some clarity, and thanks in advance!
> > >> > >
> > >> > >
> > >> > >
> > >> > > Best,
> > >> > > Upesh
> > >> > > <https://www.itrsgroup.com/>
> > >> > > Upesh Desai​
> > >> > > Senior Software Developer
> > >> > > *udesai@itrsgroup.com* <ud...@itrsgroup.com>
> > >> > > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > >> > > Internet communications are not secure and therefore the ITRS
> Group
> > >> does
> > >> > > not accept legal responsibility for the contents of this message.
> > Any
> > >> > view
> > >> > > or opinions presented are solely those of the author and do not
> > >> > necessarily
> > >> > > represent those of the ITRS Group unless otherwise specifically
> > >> stated.
> > >> > > [itrs.email.signature]
> > >> > >
> > >> > >
> > >> > > *Disclaimer*
> > >> > >
> > >> > > The information contained in this communication from the sender is
> > >> > > confidential. It is intended solely for use by the recipient and
> > >> others
> > >> > > authorized to receive it. If you are not the recipient, you are
> > hereby
> > >> > > notified that any disclosure, copying, distribution or taking
> action
> > >> in
> > >> > > relation of the contents of this information is strictly
> prohibited
> > >> and
> > >> > may
> > >> > > be unlawful.
> > >> > >
> > >> > > This email has been scanned for viruses and malware, and may have
> > been
> > >> > > automatically archived by *Mimecast Ltd*, an innovator in Software
> > as
> > >> a
> > >> > > Service (SaaS) for business. Providing a *safer* and *more useful*
> > >> place
> > >> > > for your human generated data. Specializing in; Security,
> archiving
> > >> and
> > >> > > compliance.
> > >> > >
> > >> >
> > >> >
> > >> > --
> > >> > -- Guozhang
> > >> >
> > >> >
> > >> > *Disclaimer*
> > >> >
> > >> > The information contained in this communication from the sender is
> > >> > confidential. It is intended solely for use by the recipient and
> > others
> > >> > authorized to receive it. If you are not the recipient, you are
> hereby
> > >> > notified that any disclosure, copying, distribution or taking action
> > in
> > >> > relation of the contents of this information is strictly prohibited
> > and
> > >> may
> > >> > be unlawful.
> > >> >
> > >> > This email has been scanned for viruses and malware, and may have
> been
> > >> > automatically archived by *Mimecast Ltd*, an innovator in Software
> as
> > a
> > >> > Service (SaaS) for business. Providing a *safer* and *more useful*
> > place
> > >> > for your human generated data. Specializing in; Security, archiving
> > and
> > >> > compliance.
> > >> >
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >>
> > >> *Disclaimer*
> > >>
> > >> The information contained in this communication from the sender is
> > >> confidential. It is intended solely for use by the recipient and
> others
> > >> authorized to receive it. If you are not the recipient, you are hereby
> > >> notified that any disclosure, copying, distribution or taking action
> in
> > >> relation of the contents of this information is strictly prohibited
> and
> > may
> > >> be unlawful.
> > >>
> > >> This email has been scanned for viruses and malware, and may have been
> > >> automatically archived by *Mimecast Ltd*, an innovator in Software as
> a
> > >> Service (SaaS) for business. Providing a *safer* and *more useful*
> place
> > >> for your human generated data. Specializing in; Security, archiving
> and
> > >> compliance.
> > >>
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> > --
> > -- Guozhang
> >
> >
> > *Disclaimer*
> >
> > The information contained in this communication from the sender is
> > confidential. It is intended solely for use by the recipient and others
> > authorized to receive it. If you are not the recipient, you are hereby
> > notified that any disclosure, copying, distribution or taking action in
> > relation of the contents of this information is strictly prohibited and
> may
> > be unlawful.
> >
> > This email has been scanned for viruses and malware, and may have been
> > automatically archived by *Mimecast Ltd*, an innovator in Software as a
> > Service (SaaS) for business. Providing a *safer* and *more useful* place
> > for your human generated data. Specializing in; Security, archiving and
> > compliance.
> >
>
>
> --
> -- Guozhang
>
>
> *Disclaimer*
>
> The information contained in this communication from the sender is
> confidential. It is intended solely for use by the recipient and others
> authorized to receive it. If you are not the recipient, you are hereby
> notified that any disclosure, copying, distribution or taking action in
> relation of the contents of this information is strictly prohibited and may
> be unlawful.
>
> This email has been scanned for viruses and malware, and may have been
> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> Service (SaaS) for business. Providing a *safer* and *more useful* place
> for your human generated data. Specializing in; Security, archiving and
> compliance.
>


-- 
-- Guozhang

Re: Kafka Streams Processor API state stores not restored via changelog topics

Posted by Upesh Desai <ud...@itrsgroup.com>.
Hi Guozhang,

Great to hear we might have found the issue!

To answer your question, the changelog record is generated by us calling ‘store.put(key,value)’ from the punctuate callback, which makes sense then because the timestamp would be 0 like you saw in your test as well.

Best,
Upesh


Upesh Desai | Senior Software Developer | udesai@itrsgroup.com
www.itrsgroup.com
From: Guozhang Wang <wa...@gmail.com>
Date: Tuesday, March 30, 2021 at 1:00 PM
To: Users <us...@kafka.apache.org>
Cc: Bart Lilje <bl...@itrsgroup.com>
Subject: Re: Kafka Streams Processor API state stores not restored via changelog topics
Hello Upesh,

These are super helpful logs, and I think I'm very close to the root cause
of it. You see, the written changelog record's timestamp is set to 0
(i.e. January 1st 1970 at midnight GMT), and hence given a reasonable Kafka
server start time (presumingly in 21st century), the retention time would
always be breached, and causing the log deletion mechanism to trigger.

The timestamp is set with `context.timestamp()` which would use the
processing record's timestamp; but myself have seen and fixed a bug (
https://issues.apache.org/jira/browse/KAFKA-12323) where the timestamp was
not populated and hence set to 0 if was generated as part of a punctuation.
So my next key question is: is this changelog record generated, i.e. its
put call triggered, from processing an input record, or from a punctuation
call?


Guozhang

On Mon, Mar 29, 2021 at 2:01 PM Upesh Desai <ud...@itrsgroup.com> wrote:

> Hi Guozhang,
>
>
>
> When testing with a 2.6.1 broker and 2.7 streams application, I see the
> same behavior as described before with the 2.7 broker where just after a
> record is written to the changelog topic, the log segment is rolled and
> deleted citing that the retention time has passed (the record was written
> to the state store at ~15:49:
>
>
>
> [2021-03-29 15:49:13,757] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Found deletable
> segments with base offsets [0] due to retention time 259200000ms breach
> (kafka.log.Log)
> [2021-03-29 15:49:13,761] INFO [ProducerStateManager
> partition=test-stream-store-changelog-4] Writing producer snapshot at
> offset 1 (kafka.log.ProducerStateManager)
> [2021-03-29 15:49:13,763] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Rolled new log
> segment at offset 1 in 5 ms. (kafka.log.Log)
> [2021-03-29 15:49:13,764] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Scheduling segments
> for deletion LogSegment(baseOffset=0, size=156,
> lastModifiedTime=1617050940118, largestTime=0) (kafka.log.Log)
> [2021-03-29 15:49:13,765] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Incremented log start
> offset to 1 due to segment deletion (kafka.log.Log)
>
>
>
> Does this have anything to do with the *largetTime=0* mentioned in the
> log? This was the first and only record written to the store/changelog. Is
> there anything else we can try to resolve this issue or give us more
> insight into where this issue could originate from?
>
>
>
> Thanks,
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
> <ud...@itrsgroup.com>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Upesh Desai <ud...@itrsgroup.com>
> *Date: *Thursday, March 25, 2021 at 6:46 PM
> *To: *users@kafka.apache.org <us...@kafka.apache.org>
> *Cc: *Bart Lilje <bl...@itrsgroup.com>
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> We have not tried running 2.6 brokers and 2.7 client, I will try and get
> back to you.
>
>
>
> We are not enabling EOS on the streams, we have it set to AT_LEAST_ONCE.
> The shutdowns and restarts of the stream app are clean each time.
>
>
>
> I see in the broker logs certain lines indicating that the log segment is
> being rolled and deleted, but I don’t see how or why this should be
> happening when the records were just written. See the log line snippets
> included in the attached file. Initially 8 records are added (offsets 0-8),
> followed by a single record (offset 9). They are rolled and deleted almost
> instantly.
>
>
>
> Best,
>
> Upesh
>
>
>
> *Upesh Desai**​*
>
> * | *
>
> Senior Software Developer
>
>  |
>
> *udesai@itrsgroup.com <ud...@itrsgroup.com>*
>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang <wa...@gmail.com>
> *Date: *Thursday, March 25, 2021 at 6:31 PM
> *To: *Users <us...@kafka.apache.org>
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> BTW, yes that indicates the record in the changelog was already truncated
> (logically). But since we only physically truncate logs by segments, which
> is 1GB by default, it should still be physically on the log. Are you
> enabling EOS on Streams, and when you shutdown the streams app, is that a
> clean shutdown?
>
> On Thu, Mar 25, 2021 at 4:22 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > That's indeed weird.
> >
> > Have you tried to run Kafka brokers with 2.6 while Kafka Streams client
> > with 2.7?
> >
> > On Thu, Mar 25, 2021 at 2:34 PM Upesh Desai <ud...@itrsgroup.com>
> wrote:
> >
> >> Hello Guozhang,
> >>
> >>
> >>
> >> I have tried your suggestions with an inMemoryStore FYI and seen the
> >> following:
> >>
> >>
> >>
> >>    1. I have the record added to the state store, stopped the
> >>    application, and check the earliest and latest offsets via the
> command line
> >>    tools. This shows that the earliest offset is 1, and the latest
> offset is
> >>    also 1. Does this mean that the record has been marked for deletion
> >>    already? My retention.ms config is set to 3 days (259200000 ms), so
> >>    it should not be marked for deletion if added a couple minutes prior?
> >>    2. Following the above, this makes sense as well. When logging the
> >>    starting offset, it is not 0, but rather 1:
> >>
> >>    *topic: streamapp-teststore-changelog, partition: 4, start offset: 1,
> >>    end offset: 1*
> >>
> >>
> >>
> >> I also confirmed different behavior when we change the changelog topic
> >> cleanup policy from “*compact,delete”* to just “*compact”*. We DO NOT
> >> see this issue when the changelog is just set to compact. We also
> confirmed
> >> that this does not happen when we run everything on Kafka version 2.6.
> >>
> >>
> >>
> >> Thanks,
> >>
> >> Upesh
> >>
> >>
> >> Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
> >> <ud...@itrsgroup.com>
> >> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> >> <https://www.itrsgroup.com/>
> >>
> >> *From: *Guozhang Wang <wa...@gmail.com>
> >> *Date: *Thursday, March 25, 2021 at 4:01 PM
> >> *To: *Users <us...@kafka.apache.org>
> >> *Cc: *Bart Lilje <bl...@itrsgroup.com>
> >> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> >> changelog topics
> >>
> >> Hello Upesh,
> >>
> >> Could you confirm a few more things for me:
> >>
> >> 1. After you stopped the application, and wiped out the state dir; check
> >> if
> >> the corresponding changelog topic has one record indeed at offset 0 ---
> >> this can be done via the admin#listOffsets (get the earliest and latest
> >> offset, which should be 0 and 1 correspondingly).
> >> 2. After you resumed the application, check from which starting position
> >> we
> >> are restoring the changelog --- this can be done via implementing the
> >> `stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
> >> restoreEndOffset);`, should be 0
> >>
> >> If both of them check out fine as expected, then from the code I think
> >> bufferedLimitIndex should be updated to 1.
> >>
> >>
> >> Guozhang
> >>
> >> On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai <ud...@itrsgroup.com>
> wrote:
> >>
> >> > Hi Guozhang,
> >> >
> >> >
> >> >
> >> > Here are some of the answers to your questions I see during my
> testing:
> >> >
> >> >
> >> >
> >> >    1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in
> >> my
> >> >    test 1 record had been added to the store. However the numRecords
> >> variable
> >> >    is still set to 0
> >> >    2. For that particular test, `hasRestoredToEnd()` indeed returns
> true
> >> >    as well. But it is confusing since the store is actually empty /
> that
> >> >    record I added does not exist in the store when trying to check for
> >> it.
> >> >    3. N/A
> >> >
> >> >
> >> >
> >> > A little more information, the records we add to this store/changelog
> >> are
> >> > of type <CustomKey,byte[]> where the value is always set to an empty
> >> byte
> >> > array `new byte[0]`. A couple other variations I have tried are
> setting
> >> to
> >> > a non-empty static byte array such as `new byte[1]` or `new
> byte[]{1}`.
> >> >
> >> >
> >> >
> >> > Hope this gives a little more clarity and hope to hear from you soon.
> >> >
> >> >
> >> >
> >> > Thanks,
> >> >
> >> > Upesh
> >> >
> >> >
> >> > Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
> >> > <ud...@itrsgroup.com>
> >> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> >> > <https://www.itrsgroup.com/>
> >> >
> >> > *From: *Guozhang Wang <wa...@gmail.com>
> >> > *Date: *Wednesday, March 24, 2021 at 1:37 PM
> >> > *To: *Users <us...@kafka.apache.org>
> >> > *Cc: *Bart Lilje <bl...@itrsgroup.com>
> >> > *Subject: *Re: Kafka Streams Processor API state stores not restored
> via
> >> > changelog topics
> >> >
> >> > Hello Upesh,
> >> >
> >> > Thanks for the detailed report. I looked through the code and tried to
> >> > reproduce the issue, but so far have not succeeded. I think I may need
> >> some
> >> > further information from you to help my further investigation.
> >> >
> >> > 1) The `bufferedLimitIndex == 0` itself does not necessarily mean
> >> there's
> >> > an issue, as long as it could still be bumped later (i.e. it is
> possible
> >> > that the restore consumer has not fetched data yet). What's key
> though,
> >> is
> >> > to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it
> >> would
> >> > be created with null value, and then been initialized once.
> >> ChangelogReader
> >> > would stop restoring once the current offset has reached beyond this
> >> value
> >> > or if this value itself is 0.
> >> >
> >> > 2) If `restoreEndOffset` is initialized to a non-zero value, then
> check
> >> if
> >> > the restoration indeed completed without applying any records, this is
> >> > determined as `hasRestoredToEnd()` returning true.
> >> >
> >> > 3) If `restoreEndOffset` is initialized to 0, then we need to check
> >> why: on
> >> > top of my head I can only think of that the consumer's end offset
> >> request
> >> > gets the response with 0, indicating the changelog is now empty.
> >> >
> >> >
> >> > Guozhang
> >> >
> >> >
> >> > On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <ud...@itrsgroup.com>
> >> wrote:
> >> >
> >> > > Hi all,
> >> > >
> >> > >
> >> > >
> >> > > Our team think we discovered a bug over the weekend withing the
> Kafka
> >> > > Streams / Processor API. We are running 2.7.0.
> >> > >
> >> > >
> >> > >
> >> > > When configuring a state store backed by a changelog topic with the
> >> > > cleanup policy configuration set to “compact,delete”:
> >> > >
> >> > >
> >> > >
> >> > > final StoreBuilder<KeyValueStore<k,v>> store = Stores
> >> > >   .*keyValueStoreBuilder*(
> >> > >     Stores.*persistentKeyValueStore*(*STORE_ID*),
> >> > >     kSerde,
> >> > >     vSerde)
> >> > >   .withLoggingEnabled(Map.*of*(
> >> > >     *RETENTION_MS_CONFIG*, "90000000"),
> >> > >     *CLEANUP_POLICY_CONFIG*, "compact,delete"))
> >> > >   .withCachingEnabled();
> >> > >
> >> > >
> >> > >
> >> > > Here is how we reproduced the problem:
> >> > >
> >> > >    1. Records are written to the state store, and subsequently
> >> produced
> >> > >    to the changelog topic.
> >> > >    2. Store streams application
> >> > >    3. Delete state.dir directory
> >> > >    4. Restart streams application
> >> > >    5. Confirm state store is initialized empty with no records
> >> restored
> >> > >    from changelog
> >> > >
> >> > >
> >> > >
> >> > > We see this problem with both in-memory and RocksDB backed state
> >> stores.
> >> > > For persistent state store, if the streams application is restarted
> >> > without
> >> > > the state dir being deleted, the application still does not
> “restore”
> >> > from
> >> > > the changelog, but records are still seen in the state store.
> >> > >
> >> > >
> >> > >
> >> > > When rolling back to 2.6, we do not see this issue.
> >> > >
> >> > >
> >> > >
> >> > > Doing some debugging in the source code, in the StoreChangelogReader
> >> > class
> >> > > I found that the number of records to restore is always 0 based on
> the
> >> > > below snippet:
> >> > >
> >> > >
> >> > >
> >> > > private void restoreChangelog(final ChangelogMetadata
> >> changelogMetadata)
> >> > {
> >> > >     final ProcessorStateManager stateManager =
> >> > changelogMetadata.stateManager;
> >> > >     final StateStoreMetadata storeMetadata =
> >> > changelogMetadata.storeMetadata;
> >> > >     final TopicPartition partition =
> >> storeMetadata.changelogPartition();
> >> > >     final String storeName = storeMetadata.store().name();
> >> > >     final int numRecords = changelogMetadata.bufferedLimitIndex;
> >> > >
> >> > >
> >> > >
> >> > > Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
> >> > >
> >> > >
> >> > >
> >> > > My question to you all is, 1) Is this expected behavior? 2) If not,
> >> is it
> >> > > a bug?
> >> > >
> >> > >
> >> > >
> >> > > Hope to get some clarity, and thanks in advance!
> >> > >
> >> > >
> >> > >
> >> > > Best,
> >> > > Upesh
> >> > > <https://www.itrsgroup.com/>
> >> > > Upesh Desai​
> >> > > Senior Software Developer
> >> > > *udesai@itrsgroup.com* <ud...@itrsgroup.com>
> >> > > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> >> > > Internet communications are not secure and therefore the ITRS Group
> >> does
> >> > > not accept legal responsibility for the contents of this message.
> Any
> >> > view
> >> > > or opinions presented are solely those of the author and do not
> >> > necessarily
> >> > > represent those of the ITRS Group unless otherwise specifically
> >> stated.
> >> > > [itrs.email.signature]
> >> > >
> >> > >
> >> > > *Disclaimer*
> >> > >
> >> > > The information contained in this communication from the sender is
> >> > > confidential. It is intended solely for use by the recipient and
> >> others
> >> > > authorized to receive it. If you are not the recipient, you are
> hereby
> >> > > notified that any disclosure, copying, distribution or taking action
> >> in
> >> > > relation of the contents of this information is strictly prohibited
> >> and
> >> > may
> >> > > be unlawful.
> >> > >
> >> > > This email has been scanned for viruses and malware, and may have
> been
> >> > > automatically archived by *Mimecast Ltd*, an innovator in Software
> as
> >> a
> >> > > Service (SaaS) for business. Providing a *safer* and *more useful*
> >> place
> >> > > for your human generated data. Specializing in; Security, archiving
> >> and
> >> > > compliance.
> >> > >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >> >
> >> > *Disclaimer*
> >> >
> >> > The information contained in this communication from the sender is
> >> > confidential. It is intended solely for use by the recipient and
> others
> >> > authorized to receive it. If you are not the recipient, you are hereby
> >> > notified that any disclosure, copying, distribution or taking action
> in
> >> > relation of the contents of this information is strictly prohibited
> and
> >> may
> >> > be unlawful.
> >> >
> >> > This email has been scanned for viruses and malware, and may have been
> >> > automatically archived by *Mimecast Ltd*, an innovator in Software as
> a
> >> > Service (SaaS) for business. Providing a *safer* and *more useful*
> place
> >> > for your human generated data. Specializing in; Security, archiving
> and
> >> > compliance.
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >>
> >> *Disclaimer*
> >>
> >> The information contained in this communication from the sender is
> >> confidential. It is intended solely for use by the recipient and others
> >> authorized to receive it. If you are not the recipient, you are hereby
> >> notified that any disclosure, copying, distribution or taking action in
> >> relation of the contents of this information is strictly prohibited and
> may
> >> be unlawful.
> >>
> >> This email has been scanned for viruses and malware, and may have been
> >> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> >> Service (SaaS) for business. Providing a *safer* and *more useful* place
> >> for your human generated data. Specializing in; Security, archiving and
> >> compliance.
> >>
> >
> >
> > --
> > -- Guozhang
> >
>
>
> --
> -- Guozhang
>
>
> *Disclaimer*
>
> The information contained in this communication from the sender is
> confidential. It is intended solely for use by the recipient and others
> authorized to receive it. If you are not the recipient, you are hereby
> notified that any disclosure, copying, distribution or taking action in
> relation of the contents of this information is strictly prohibited and may
> be unlawful.
>
> This email has been scanned for viruses and malware, and may have been
> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> Service (SaaS) for business. Providing a *safer* and *more useful* place
> for your human generated data. Specializing in; Security, archiving and
> compliance.
>


--
-- Guozhang

Disclaimer

The information contained in this communication from the sender is confidential. It is intended solely for use by the recipient and others authorized to receive it. If you are not the recipient, you are hereby notified that any disclosure, copying, distribution or taking action in relation of the contents of this information is strictly prohibited and may be unlawful.

This email has been scanned for viruses and malware, and may have been automatically archived by Mimecast Ltd, an innovator in Software as a Service (SaaS) for business. Providing a safer and more useful place for your human generated data. Specializing in; Security, archiving and compliance. To find out more visit the Mimecast website.

Re: Kafka Streams Processor API state stores not restored via changelog topics

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Upesh,

These are super helpful logs, and I think I'm very close to the root cause
of it. You see, the written changelog record's timestamp is set to 0
(i.e. January 1st 1970 at midnight GMT), and hence given a reasonable Kafka
server start time (presumingly in 21st century), the retention time would
always be breached, and causing the log deletion mechanism to trigger.

The timestamp is set with `context.timestamp()` which would use the
processing record's timestamp; but myself have seen and fixed a bug (
https://issues.apache.org/jira/browse/KAFKA-12323) where the timestamp was
not populated and hence set to 0 if was generated as part of a punctuation.
So my next key question is: is this changelog record generated, i.e. its
put call triggered, from processing an input record, or from a punctuation
call?


Guozhang

On Mon, Mar 29, 2021 at 2:01 PM Upesh Desai <ud...@itrsgroup.com> wrote:

> Hi Guozhang,
>
>
>
> When testing with a 2.6.1 broker and 2.7 streams application, I see the
> same behavior as described before with the 2.7 broker where just after a
> record is written to the changelog topic, the log segment is rolled and
> deleted citing that the retention time has passed (the record was written
> to the state store at ~15:49:
>
>
>
> [2021-03-29 15:49:13,757] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Found deletable
> segments with base offsets [0] due to retention time 259200000ms breach
> (kafka.log.Log)
> [2021-03-29 15:49:13,761] INFO [ProducerStateManager
> partition=test-stream-store-changelog-4] Writing producer snapshot at
> offset 1 (kafka.log.ProducerStateManager)
> [2021-03-29 15:49:13,763] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Rolled new log
> segment at offset 1 in 5 ms. (kafka.log.Log)
> [2021-03-29 15:49:13,764] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Scheduling segments
> for deletion LogSegment(baseOffset=0, size=156,
> lastModifiedTime=1617050940118, largestTime=0) (kafka.log.Log)
> [2021-03-29 15:49:13,765] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Incremented log start
> offset to 1 due to segment deletion (kafka.log.Log)
>
>
>
> Does this have anything to do with the *largetTime=0* mentioned in the
> log? This was the first and only record written to the store/changelog. Is
> there anything else we can try to resolve this issue or give us more
> insight into where this issue could originate from?
>
>
>
> Thanks,
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
> <ud...@itrsgroup.com>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Upesh Desai <ud...@itrsgroup.com>
> *Date: *Thursday, March 25, 2021 at 6:46 PM
> *To: *users@kafka.apache.org <us...@kafka.apache.org>
> *Cc: *Bart Lilje <bl...@itrsgroup.com>
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> We have not tried running 2.6 brokers and 2.7 client, I will try and get
> back to you.
>
>
>
> We are not enabling EOS on the streams, we have it set to AT_LEAST_ONCE.
> The shutdowns and restarts of the stream app are clean each time.
>
>
>
> I see in the broker logs certain lines indicating that the log segment is
> being rolled and deleted, but I don’t see how or why this should be
> happening when the records were just written. See the log line snippets
> included in the attached file. Initially 8 records are added (offsets 0-8),
> followed by a single record (offset 9). They are rolled and deleted almost
> instantly.
>
>
>
> Best,
>
> Upesh
>
>
>
> *Upesh Desai**​*
>
> * | *
>
> Senior Software Developer
>
>  |
>
> *udesai@itrsgroup.com <ud...@itrsgroup.com>*
>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang <wa...@gmail.com>
> *Date: *Thursday, March 25, 2021 at 6:31 PM
> *To: *Users <us...@kafka.apache.org>
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> BTW, yes that indicates the record in the changelog was already truncated
> (logically). But since we only physically truncate logs by segments, which
> is 1GB by default, it should still be physically on the log. Are you
> enabling EOS on Streams, and when you shutdown the streams app, is that a
> clean shutdown?
>
> On Thu, Mar 25, 2021 at 4:22 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > That's indeed weird.
> >
> > Have you tried to run Kafka brokers with 2.6 while Kafka Streams client
> > with 2.7?
> >
> > On Thu, Mar 25, 2021 at 2:34 PM Upesh Desai <ud...@itrsgroup.com>
> wrote:
> >
> >> Hello Guozhang,
> >>
> >>
> >>
> >> I have tried your suggestions with an inMemoryStore FYI and seen the
> >> following:
> >>
> >>
> >>
> >>    1. I have the record added to the state store, stopped the
> >>    application, and check the earliest and latest offsets via the
> command line
> >>    tools. This shows that the earliest offset is 1, and the latest
> offset is
> >>    also 1. Does this mean that the record has been marked for deletion
> >>    already? My retention.ms config is set to 3 days (259200000 ms), so
> >>    it should not be marked for deletion if added a couple minutes prior?
> >>    2. Following the above, this makes sense as well. When logging the
> >>    starting offset, it is not 0, but rather 1:
> >>
> >>    *topic: streamapp-teststore-changelog, partition: 4, start offset: 1,
> >>    end offset: 1*
> >>
> >>
> >>
> >> I also confirmed different behavior when we change the changelog topic
> >> cleanup policy from “*compact,delete”* to just “*compact”*. We DO NOT
> >> see this issue when the changelog is just set to compact. We also
> confirmed
> >> that this does not happen when we run everything on Kafka version 2.6.
> >>
> >>
> >>
> >> Thanks,
> >>
> >> Upesh
> >>
> >>
> >> Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
> >> <ud...@itrsgroup.com>
> >> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> >> <https://www.itrsgroup.com/>
> >>
> >> *From: *Guozhang Wang <wa...@gmail.com>
> >> *Date: *Thursday, March 25, 2021 at 4:01 PM
> >> *To: *Users <us...@kafka.apache.org>
> >> *Cc: *Bart Lilje <bl...@itrsgroup.com>
> >> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> >> changelog topics
> >>
> >> Hello Upesh,
> >>
> >> Could you confirm a few more things for me:
> >>
> >> 1. After you stopped the application, and wiped out the state dir; check
> >> if
> >> the corresponding changelog topic has one record indeed at offset 0 ---
> >> this can be done via the admin#listOffsets (get the earliest and latest
> >> offset, which should be 0 and 1 correspondingly).
> >> 2. After you resumed the application, check from which starting position
> >> we
> >> are restoring the changelog --- this can be done via implementing the
> >> `stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
> >> restoreEndOffset);`, should be 0
> >>
> >> If both of them check out fine as expected, then from the code I think
> >> bufferedLimitIndex should be updated to 1.
> >>
> >>
> >> Guozhang
> >>
> >> On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai <ud...@itrsgroup.com>
> wrote:
> >>
> >> > Hi Guozhang,
> >> >
> >> >
> >> >
> >> > Here are some of the answers to your questions I see during my
> testing:
> >> >
> >> >
> >> >
> >> >    1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in
> >> my
> >> >    test 1 record had been added to the store. However the numRecords
> >> variable
> >> >    is still set to 0
> >> >    2. For that particular test, `hasRestoredToEnd()` indeed returns
> true
> >> >    as well. But it is confusing since the store is actually empty /
> that
> >> >    record I added does not exist in the store when trying to check for
> >> it.
> >> >    3. N/A
> >> >
> >> >
> >> >
> >> > A little more information, the records we add to this store/changelog
> >> are
> >> > of type <CustomKey,byte[]> where the value is always set to an empty
> >> byte
> >> > array `new byte[0]`. A couple other variations I have tried are
> setting
> >> to
> >> > a non-empty static byte array such as `new byte[1]` or `new
> byte[]{1}`.
> >> >
> >> >
> >> >
> >> > Hope this gives a little more clarity and hope to hear from you soon.
> >> >
> >> >
> >> >
> >> > Thanks,
> >> >
> >> > Upesh
> >> >
> >> >
> >> > Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
> >> > <ud...@itrsgroup.com>
> >> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> >> > <https://www.itrsgroup.com/>
> >> >
> >> > *From: *Guozhang Wang <wa...@gmail.com>
> >> > *Date: *Wednesday, March 24, 2021 at 1:37 PM
> >> > *To: *Users <us...@kafka.apache.org>
> >> > *Cc: *Bart Lilje <bl...@itrsgroup.com>
> >> > *Subject: *Re: Kafka Streams Processor API state stores not restored
> via
> >> > changelog topics
> >> >
> >> > Hello Upesh,
> >> >
> >> > Thanks for the detailed report. I looked through the code and tried to
> >> > reproduce the issue, but so far have not succeeded. I think I may need
> >> some
> >> > further information from you to help my further investigation.
> >> >
> >> > 1) The `bufferedLimitIndex == 0` itself does not necessarily mean
> >> there's
> >> > an issue, as long as it could still be bumped later (i.e. it is
> possible
> >> > that the restore consumer has not fetched data yet). What's key
> though,
> >> is
> >> > to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it
> >> would
> >> > be created with null value, and then been initialized once.
> >> ChangelogReader
> >> > would stop restoring once the current offset has reached beyond this
> >> value
> >> > or if this value itself is 0.
> >> >
> >> > 2) If `restoreEndOffset` is initialized to a non-zero value, then
> check
> >> if
> >> > the restoration indeed completed without applying any records, this is
> >> > determined as `hasRestoredToEnd()` returning true.
> >> >
> >> > 3) If `restoreEndOffset` is initialized to 0, then we need to check
> >> why: on
> >> > top of my head I can only think of that the consumer's end offset
> >> request
> >> > gets the response with 0, indicating the changelog is now empty.
> >> >
> >> >
> >> > Guozhang
> >> >
> >> >
> >> > On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <ud...@itrsgroup.com>
> >> wrote:
> >> >
> >> > > Hi all,
> >> > >
> >> > >
> >> > >
> >> > > Our team think we discovered a bug over the weekend withing the
> Kafka
> >> > > Streams / Processor API. We are running 2.7.0.
> >> > >
> >> > >
> >> > >
> >> > > When configuring a state store backed by a changelog topic with the
> >> > > cleanup policy configuration set to “compact,delete”:
> >> > >
> >> > >
> >> > >
> >> > > final StoreBuilder<KeyValueStore<k,v>> store = Stores
> >> > >   .*keyValueStoreBuilder*(
> >> > >     Stores.*persistentKeyValueStore*(*STORE_ID*),
> >> > >     kSerde,
> >> > >     vSerde)
> >> > >   .withLoggingEnabled(Map.*of*(
> >> > >     *RETENTION_MS_CONFIG*, "90000000"),
> >> > >     *CLEANUP_POLICY_CONFIG*, "compact,delete"))
> >> > >   .withCachingEnabled();
> >> > >
> >> > >
> >> > >
> >> > > Here is how we reproduced the problem:
> >> > >
> >> > >    1. Records are written to the state store, and subsequently
> >> produced
> >> > >    to the changelog topic.
> >> > >    2. Store streams application
> >> > >    3. Delete state.dir directory
> >> > >    4. Restart streams application
> >> > >    5. Confirm state store is initialized empty with no records
> >> restored
> >> > >    from changelog
> >> > >
> >> > >
> >> > >
> >> > > We see this problem with both in-memory and RocksDB backed state
> >> stores.
> >> > > For persistent state store, if the streams application is restarted
> >> > without
> >> > > the state dir being deleted, the application still does not
> “restore”
> >> > from
> >> > > the changelog, but records are still seen in the state store.
> >> > >
> >> > >
> >> > >
> >> > > When rolling back to 2.6, we do not see this issue.
> >> > >
> >> > >
> >> > >
> >> > > Doing some debugging in the source code, in the StoreChangelogReader
> >> > class
> >> > > I found that the number of records to restore is always 0 based on
> the
> >> > > below snippet:
> >> > >
> >> > >
> >> > >
> >> > > private void restoreChangelog(final ChangelogMetadata
> >> changelogMetadata)
> >> > {
> >> > >     final ProcessorStateManager stateManager =
> >> > changelogMetadata.stateManager;
> >> > >     final StateStoreMetadata storeMetadata =
> >> > changelogMetadata.storeMetadata;
> >> > >     final TopicPartition partition =
> >> storeMetadata.changelogPartition();
> >> > >     final String storeName = storeMetadata.store().name();
> >> > >     final int numRecords = changelogMetadata.bufferedLimitIndex;
> >> > >
> >> > >
> >> > >
> >> > > Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
> >> > >
> >> > >
> >> > >
> >> > > My question to you all is, 1) Is this expected behavior? 2) If not,
> >> is it
> >> > > a bug?
> >> > >
> >> > >
> >> > >
> >> > > Hope to get some clarity, and thanks in advance!
> >> > >
> >> > >
> >> > >
> >> > > Best,
> >> > > Upesh
> >> > > <https://www.itrsgroup.com/>
> >> > > Upesh Desai​
> >> > > Senior Software Developer
> >> > > *udesai@itrsgroup.com* <ud...@itrsgroup.com>
> >> > > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> >> > > Internet communications are not secure and therefore the ITRS Group
> >> does
> >> > > not accept legal responsibility for the contents of this message.
> Any
> >> > view
> >> > > or opinions presented are solely those of the author and do not
> >> > necessarily
> >> > > represent those of the ITRS Group unless otherwise specifically
> >> stated.
> >> > > [itrs.email.signature]
> >> > >
> >> > >
> >> > > *Disclaimer*
> >> > >
> >> > > The information contained in this communication from the sender is
> >> > > confidential. It is intended solely for use by the recipient and
> >> others
> >> > > authorized to receive it. If you are not the recipient, you are
> hereby
> >> > > notified that any disclosure, copying, distribution or taking action
> >> in
> >> > > relation of the contents of this information is strictly prohibited
> >> and
> >> > may
> >> > > be unlawful.
> >> > >
> >> > > This email has been scanned for viruses and malware, and may have
> been
> >> > > automatically archived by *Mimecast Ltd*, an innovator in Software
> as
> >> a
> >> > > Service (SaaS) for business. Providing a *safer* and *more useful*
> >> place
> >> > > for your human generated data. Specializing in; Security, archiving
> >> and
> >> > > compliance.
> >> > >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >> >
> >> > *Disclaimer*
> >> >
> >> > The information contained in this communication from the sender is
> >> > confidential. It is intended solely for use by the recipient and
> others
> >> > authorized to receive it. If you are not the recipient, you are hereby
> >> > notified that any disclosure, copying, distribution or taking action
> in
> >> > relation of the contents of this information is strictly prohibited
> and
> >> may
> >> > be unlawful.
> >> >
> >> > This email has been scanned for viruses and malware, and may have been
> >> > automatically archived by *Mimecast Ltd*, an innovator in Software as
> a
> >> > Service (SaaS) for business. Providing a *safer* and *more useful*
> place
> >> > for your human generated data. Specializing in; Security, archiving
> and
> >> > compliance.
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >>
> >> *Disclaimer*
> >>
> >> The information contained in this communication from the sender is
> >> confidential. It is intended solely for use by the recipient and others
> >> authorized to receive it. If you are not the recipient, you are hereby
> >> notified that any disclosure, copying, distribution or taking action in
> >> relation of the contents of this information is strictly prohibited and
> may
> >> be unlawful.
> >>
> >> This email has been scanned for viruses and malware, and may have been
> >> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> >> Service (SaaS) for business. Providing a *safer* and *more useful* place
> >> for your human generated data. Specializing in; Security, archiving and
> >> compliance.
> >>
> >
> >
> > --
> > -- Guozhang
> >
>
>
> --
> -- Guozhang
>
>
> *Disclaimer*
>
> The information contained in this communication from the sender is
> confidential. It is intended solely for use by the recipient and others
> authorized to receive it. If you are not the recipient, you are hereby
> notified that any disclosure, copying, distribution or taking action in
> relation of the contents of this information is strictly prohibited and may
> be unlawful.
>
> This email has been scanned for viruses and malware, and may have been
> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> Service (SaaS) for business. Providing a *safer* and *more useful* place
> for your human generated data. Specializing in; Security, archiving and
> compliance.
>


-- 
-- Guozhang

Re: Kafka Streams Processor API state stores not restored via changelog topics

Posted by Upesh Desai <ud...@itrsgroup.com>.
Hi Guozhang,

When testing with a 2.6.1 broker and 2.7 streams application, I see the same behavior as described before with the 2.7 broker where just after a record is written to the changelog topic, the log segment is rolled and deleted citing that the retention time has passed (the record was written to the state store at ~15:49:

[2021-03-29 15:49:13,757] INFO [Log partition=test-stream-store-changelog-4, dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Found deletable segments with base offsets [0] due to retention time 259200000ms breach (kafka.log.Log)
[2021-03-29 15:49:13,761] INFO [ProducerStateManager partition=test-stream-store-changelog-4] Writing producer snapshot at offset 1 (kafka.log.ProducerStateManager)
[2021-03-29 15:49:13,763] INFO [Log partition=test-stream-store-changelog-4, dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Rolled new log segment at offset 1 in 5 ms. (kafka.log.Log)
[2021-03-29 15:49:13,764] INFO [Log partition=test-stream-store-changelog-4, dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Scheduling segments for deletion LogSegment(baseOffset=0, size=156, lastModifiedTime=1617050940118, largestTime=0) (kafka.log.Log)
[2021-03-29 15:49:13,765] INFO [Log partition=test-stream-store-changelog-4, dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Incremented log start offset to 1 due to segment deletion (kafka.log.Log)

Does this have anything to do with the largetTime=0 mentioned in the log? This was the first and only record written to the store/changelog. Is there anything else we can try to resolve this issue or give us more insight into where this issue could originate from?

Thanks,
Upesh


Upesh Desai | Senior Software Developer | udesai@itrsgroup.com
www.itrsgroup.com
From: Upesh Desai <ud...@itrsgroup.com>
Date: Thursday, March 25, 2021 at 6:46 PM
To: users@kafka.apache.org <us...@kafka.apache.org>
Cc: Bart Lilje <bl...@itrsgroup.com>
Subject: Re: Kafka Streams Processor API state stores not restored via changelog topics
We have not tried running 2.6 brokers and 2.7 client, I will try and get back to you.

We are not enabling EOS on the streams, we have it set to AT_LEAST_ONCE. The shutdowns and restarts of the stream app are clean each time.

I see in the broker logs certain lines indicating that the log segment is being rolled and deleted, but I don’t see how or why this should be happening when the records were just written. See the log line snippets included in the attached file. Initially 8 records are added (offsets 0-8), followed by a single record (offset 9). They are rolled and deleted almost instantly.

Best,
Upesh

Upesh Desai​
 |
Senior Software Developer
 |
udesai@itrsgroup.com<ma...@itrsgroup.com>
www.itrsgroup.com<https://www.itrsgroup.com/>
[cid:image001.png@01D724B4.AF7F0790]<https://www.itrsgroup.com/>
From: Guozhang Wang <wa...@gmail.com>
Date: Thursday, March 25, 2021 at 6:31 PM
To: Users <us...@kafka.apache.org>
Subject: Re: Kafka Streams Processor API state stores not restored via changelog topics
BTW, yes that indicates the record in the changelog was already truncated
(logically). But since we only physically truncate logs by segments, which
is 1GB by default, it should still be physically on the log. Are you
enabling EOS on Streams, and when you shutdown the streams app, is that a
clean shutdown?

On Thu, Mar 25, 2021 at 4:22 PM Guozhang Wang <wa...@gmail.com> wrote:

> That's indeed weird.
>
> Have you tried to run Kafka brokers with 2.6 while Kafka Streams client
> with 2.7?
>
> On Thu, Mar 25, 2021 at 2:34 PM Upesh Desai <ud...@itrsgroup.com> wrote:
>
>> Hello Guozhang,
>>
>>
>>
>> I have tried your suggestions with an inMemoryStore FYI and seen the
>> following:
>>
>>
>>
>>    1. I have the record added to the state store, stopped the
>>    application, and check the earliest and latest offsets via the command line
>>    tools. This shows that the earliest offset is 1, and the latest offset is
>>    also 1. Does this mean that the record has been marked for deletion
>>    already? My retention.ms config is set to 3 days (259200000 ms), so
>>    it should not be marked for deletion if added a couple minutes prior?
>>    2. Following the above, this makes sense as well. When logging the
>>    starting offset, it is not 0, but rather 1:
>>
>>    *topic: streamapp-teststore-changelog, partition: 4, start offset: 1,
>>    end offset: 1*
>>
>>
>>
>> I also confirmed different behavior when we change the changelog topic
>> cleanup policy from “*compact,delete”* to just “*compact”*. We DO NOT
>> see this issue when the changelog is just set to compact. We also confirmed
>> that this does not happen when we run everything on Kafka version 2.6.
>>
>>
>>
>> Thanks,
>>
>> Upesh
>>
>>
>> Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
>> <ud...@itrsgroup.com>
>> *www.itrsgroup.com* <https://www.itrsgroup.com/>
>> <https://www.itrsgroup.com/>
>>
>> *From: *Guozhang Wang <wa...@gmail.com>
>> *Date: *Thursday, March 25, 2021 at 4:01 PM
>> *To: *Users <us...@kafka.apache.org>
>> *Cc: *Bart Lilje <bl...@itrsgroup.com>
>> *Subject: *Re: Kafka Streams Processor API state stores not restored via
>> changelog topics
>>
>> Hello Upesh,
>>
>> Could you confirm a few more things for me:
>>
>> 1. After you stopped the application, and wiped out the state dir; check
>> if
>> the corresponding changelog topic has one record indeed at offset 0 ---
>> this can be done via the admin#listOffsets (get the earliest and latest
>> offset, which should be 0 and 1 correspondingly).
>> 2. After you resumed the application, check from which starting position
>> we
>> are restoring the changelog --- this can be done via implementing the
>> `stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
>> restoreEndOffset);`, should be 0
>>
>> If both of them check out fine as expected, then from the code I think
>> bufferedLimitIndex should be updated to 1.
>>
>>
>> Guozhang
>>
>> On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai <ud...@itrsgroup.com> wrote:
>>
>> > Hi Guozhang,
>> >
>> >
>> >
>> > Here are some of the answers to your questions I see during my testing:
>> >
>> >
>> >
>> >    1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in
>> my
>> >    test 1 record had been added to the store. However the numRecords
>> variable
>> >    is still set to 0
>> >    2. For that particular test, `hasRestoredToEnd()` indeed returns true
>> >    as well. But it is confusing since the store is actually empty / that
>> >    record I added does not exist in the store when trying to check for
>> it.
>> >    3. N/A
>> >
>> >
>> >
>> > A little more information, the records we add to this store/changelog
>> are
>> > of type <CustomKey,byte[]> where the value is always set to an empty
>> byte
>> > array `new byte[0]`. A couple other variations I have tried are setting
>> to
>> > a non-empty static byte array such as `new byte[1]` or `new byte[]{1}`.
>> >
>> >
>> >
>> > Hope this gives a little more clarity and hope to hear from you soon.
>> >
>> >
>> >
>> > Thanks,
>> >
>> > Upesh
>> >
>> >
>> > Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
>> > <ud...@itrsgroup.com>
>> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
>> > <https://www.itrsgroup.com/>
>> >
>> > *From: *Guozhang Wang <wa...@gmail.com>
>> > *Date: *Wednesday, March 24, 2021 at 1:37 PM
>> > *To: *Users <us...@kafka.apache.org>
>> > *Cc: *Bart Lilje <bl...@itrsgroup.com>
>> > *Subject: *Re: Kafka Streams Processor API state stores not restored via
>> > changelog topics
>> >
>> > Hello Upesh,
>> >
>> > Thanks for the detailed report. I looked through the code and tried to
>> > reproduce the issue, but so far have not succeeded. I think I may need
>> some
>> > further information from you to help my further investigation.
>> >
>> > 1) The `bufferedLimitIndex == 0` itself does not necessarily mean
>> there's
>> > an issue, as long as it could still be bumped later (i.e. it is possible
>> > that the restore consumer has not fetched data yet). What's key though,
>> is
>> > to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it
>> would
>> > be created with null value, and then been initialized once.
>> ChangelogReader
>> > would stop restoring once the current offset has reached beyond this
>> value
>> > or if this value itself is 0.
>> >
>> > 2) If `restoreEndOffset` is initialized to a non-zero value, then check
>> if
>> > the restoration indeed completed without applying any records, this is
>> > determined as `hasRestoredToEnd()` returning true.
>> >
>> > 3) If `restoreEndOffset` is initialized to 0, then we need to check
>> why: on
>> > top of my head I can only think of that the consumer's end offset
>> request
>> > gets the response with 0, indicating the changelog is now empty.
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <ud...@itrsgroup.com>
>> wrote:
>> >
>> > > Hi all,
>> > >
>> > >
>> > >
>> > > Our team think we discovered a bug over the weekend withing the Kafka
>> > > Streams / Processor API. We are running 2.7.0.
>> > >
>> > >
>> > >
>> > > When configuring a state store backed by a changelog topic with the
>> > > cleanup policy configuration set to “compact,delete”:
>> > >
>> > >
>> > >
>> > > final StoreBuilder<KeyValueStore<k,v>> store = Stores
>> > >   .*keyValueStoreBuilder*(
>> > >     Stores.*persistentKeyValueStore*(*STORE_ID*),
>> > >     kSerde,
>> > >     vSerde)
>> > >   .withLoggingEnabled(Map.*of*(
>> > >     *RETENTION_MS_CONFIG*, "90000000"),
>> > >     *CLEANUP_POLICY_CONFIG*, "compact,delete"))
>> > >   .withCachingEnabled();
>> > >
>> > >
>> > >
>> > > Here is how we reproduced the problem:
>> > >
>> > >    1. Records are written to the state store, and subsequently
>> produced
>> > >    to the changelog topic.
>> > >    2. Store streams application
>> > >    3. Delete state.dir directory
>> > >    4. Restart streams application
>> > >    5. Confirm state store is initialized empty with no records
>> restored
>> > >    from changelog
>> > >
>> > >
>> > >
>> > > We see this problem with both in-memory and RocksDB backed state
>> stores.
>> > > For persistent state store, if the streams application is restarted
>> > without
>> > > the state dir being deleted, the application still does not “restore”
>> > from
>> > > the changelog, but records are still seen in the state store.
>> > >
>> > >
>> > >
>> > > When rolling back to 2.6, we do not see this issue.
>> > >
>> > >
>> > >
>> > > Doing some debugging in the source code, in the StoreChangelogReader
>> > class
>> > > I found that the number of records to restore is always 0 based on the
>> > > below snippet:
>> > >
>> > >
>> > >
>> > > private void restoreChangelog(final ChangelogMetadata
>> changelogMetadata)
>> > {
>> > >     final ProcessorStateManager stateManager =
>> > changelogMetadata.stateManager;
>> > >     final StateStoreMetadata storeMetadata =
>> > changelogMetadata.storeMetadata;
>> > >     final TopicPartition partition =
>> storeMetadata.changelogPartition();
>> > >     final String storeName = storeMetadata.store().name();
>> > >     final int numRecords = changelogMetadata.bufferedLimitIndex;
>> > >
>> > >
>> > >
>> > > Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
>> > >
>> > >
>> > >
>> > > My question to you all is, 1) Is this expected behavior? 2) If not,
>> is it
>> > > a bug?
>> > >
>> > >
>> > >
>> > > Hope to get some clarity, and thanks in advance!
>> > >
>> > >
>> > >
>> > > Best,
>> > > Upesh
>> > > <https://www.itrsgroup.com/>
>> > > Upesh Desai​
>> > > Senior Software Developer
>> > > *udesai@itrsgroup.com* <ud...@itrsgroup.com>
>> > > *www.itrsgroup.com* <https://www.itrsgroup.com/>
>> > > Internet communications are not secure and therefore the ITRS Group
>> does
>> > > not accept legal responsibility for the contents of this message. Any
>> > view
>> > > or opinions presented are solely those of the author and do not
>> > necessarily
>> > > represent those of the ITRS Group unless otherwise specifically
>> stated.
>> > > [itrs.email.signature]
>> > >
>> > >
>> > > *Disclaimer*
>> > >
>> > > The information contained in this communication from the sender is
>> > > confidential. It is intended solely for use by the recipient and
>> others
>> > > authorized to receive it. If you are not the recipient, you are hereby
>> > > notified that any disclosure, copying, distribution or taking action
>> in
>> > > relation of the contents of this information is strictly prohibited
>> and
>> > may
>> > > be unlawful.
>> > >
>> > > This email has been scanned for viruses and malware, and may have been
>> > > automatically archived by *Mimecast Ltd*, an innovator in Software as
>> a
>> > > Service (SaaS) for business. Providing a *safer* and *more useful*
>> place
>> > > for your human generated data. Specializing in; Security, archiving
>> and
>> > > compliance.
>> > >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>> >
>> > *Disclaimer*
>> >
>> > The information contained in this communication from the sender is
>> > confidential. It is intended solely for use by the recipient and others
>> > authorized to receive it. If you are not the recipient, you are hereby
>> > notified that any disclosure, copying, distribution or taking action in
>> > relation of the contents of this information is strictly prohibited and
>> may
>> > be unlawful.
>> >
>> > This email has been scanned for viruses and malware, and may have been
>> > automatically archived by *Mimecast Ltd*, an innovator in Software as a
>> > Service (SaaS) for business. Providing a *safer* and *more useful* place
>> > for your human generated data. Specializing in; Security, archiving and
>> > compliance.
>> >
>>
>>
>> --
>> -- Guozhang
>>
>>
>> *Disclaimer*
>>
>> The information contained in this communication from the sender is
>> confidential. It is intended solely for use by the recipient and others
>> authorized to receive it. If you are not the recipient, you are hereby
>> notified that any disclosure, copying, distribution or taking action in
>> relation of the contents of this information is strictly prohibited and may
>> be unlawful.
>>
>> This email has been scanned for viruses and malware, and may have been
>> automatically archived by *Mimecast Ltd*, an innovator in Software as a
>> Service (SaaS) for business. Providing a *safer* and *more useful* place
>> for your human generated data. Specializing in; Security, archiving and
>> compliance.
>>
>
>
> --
> -- Guozhang
>


--
-- Guozhang

Disclaimer

The information contained in this communication from the sender is confidential. It is intended solely for use by the recipient and others authorized to receive it. If you are not the recipient, you are hereby notified that any disclosure, copying, distribution or taking action in relation of the contents of this information is strictly prohibited and may be unlawful.

This email has been scanned for viruses and malware, and may have been automatically archived by Mimecast Ltd, an innovator in Software as a Service (SaaS) for business. Providing a safer and more useful place for your human generated data. Specializing in; Security, archiving and compliance. To find out more visit the Mimecast website.

Re: Kafka Streams Processor API state stores not restored via changelog topics

Posted by Upesh Desai <ud...@itrsgroup.com>.
We have not tried running 2.6 brokers and 2.7 client, I will try and get back to you.

We are not enabling EOS on the streams, we have it set to AT_LEAST_ONCE. The shutdowns and restarts of the stream app are clean each time.

I see in the broker logs certain lines indicating that the log segment is being rolled and deleted, but I don’t see how or why this should be happening when the records were just written. See the log line snippets included in the attached file. Initially 8 records are added (offsets 0-8), followed by a single record (offset 9). They are rolled and deleted almost instantly.

Best,
Upesh


Upesh Desai | Senior Software Developer | udesai@itrsgroup.com
www.itrsgroup.com
From: Guozhang Wang <wa...@gmail.com>
Date: Thursday, March 25, 2021 at 6:31 PM
To: Users <us...@kafka.apache.org>
Subject: Re: Kafka Streams Processor API state stores not restored via changelog topics
BTW, yes that indicates the record in the changelog was already truncated
(logically). But since we only physically truncate logs by segments, which
is 1GB by default, it should still be physically on the log. Are you
enabling EOS on Streams, and when you shutdown the streams app, is that a
clean shutdown?

On Thu, Mar 25, 2021 at 4:22 PM Guozhang Wang <wa...@gmail.com> wrote:

> That's indeed weird.
>
> Have you tried to run Kafka brokers with 2.6 while Kafka Streams client
> with 2.7?
>
> On Thu, Mar 25, 2021 at 2:34 PM Upesh Desai <ud...@itrsgroup.com> wrote:
>
>> Hello Guozhang,
>>
>>
>>
>> I have tried your suggestions with an inMemoryStore FYI and seen the
>> following:
>>
>>
>>
>>    1. I have the record added to the state store, stopped the
>>    application, and check the earliest and latest offsets via the command line
>>    tools. This shows that the earliest offset is 1, and the latest offset is
>>    also 1. Does this mean that the record has been marked for deletion
>>    already? My retention.ms config is set to 3 days (259200000 ms), so
>>    it should not be marked for deletion if added a couple minutes prior?
>>    2. Following the above, this makes sense as well. When logging the
>>    starting offset, it is not 0, but rather 1:
>>
>>    *topic: streamapp-teststore-changelog, partition: 4, start offset: 1,
>>    end offset: 1*
>>
>>
>>
>> I also confirmed different behavior when we change the changelog topic
>> cleanup policy from “*compact,delete”* to just “*compact”*. We DO NOT
>> see this issue when the changelog is just set to compact. We also confirmed
>> that this does not happen when we run everything on Kafka version 2.6.
>>
>>
>>
>> Thanks,
>>
>> Upesh
>>
>>
>> Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
>> <ud...@itrsgroup.com>
>> *www.itrsgroup.com* <https://www.itrsgroup.com/>
>> <https://www.itrsgroup.com/>
>>
>> *From: *Guozhang Wang <wa...@gmail.com>
>> *Date: *Thursday, March 25, 2021 at 4:01 PM
>> *To: *Users <us...@kafka.apache.org>
>> *Cc: *Bart Lilje <bl...@itrsgroup.com>
>> *Subject: *Re: Kafka Streams Processor API state stores not restored via
>> changelog topics
>>
>> Hello Upesh,
>>
>> Could you confirm a few more things for me:
>>
>> 1. After you stopped the application, and wiped out the state dir; check
>> if
>> the corresponding changelog topic has one record indeed at offset 0 ---
>> this can be done via the admin#listOffsets (get the earliest and latest
>> offset, which should be 0 and 1 correspondingly).
>> 2. After you resumed the application, check from which starting position
>> we
>> are restoring the changelog --- this can be done via implementing the
>> `stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
>> restoreEndOffset);`, should be 0
>>
>> If both of them check out fine as expected, then from the code I think
>> bufferedLimitIndex should be updated to 1.
>>
>>
>> Guozhang
>>
>> On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai <ud...@itrsgroup.com> wrote:
>>
>> > Hi Guozhang,
>> >
>> >
>> >
>> > Here are some of the answers to your questions I see during my testing:
>> >
>> >
>> >
>> >    1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in
>> my
>> >    test 1 record had been added to the store. However the numRecords
>> variable
>> >    is still set to 0
>> >    2. For that particular test, `hasRestoredToEnd()` indeed returns true
>> >    as well. But it is confusing since the store is actually empty / that
>> >    record I added does not exist in the store when trying to check for
>> it.
>> >    3. N/A
>> >
>> >
>> >
>> > A little more information, the records we add to this store/changelog
>> are
>> > of type <CustomKey,byte[]> where the value is always set to an empty
>> byte
>> > array `new byte[0]`. A couple other variations I have tried are setting
>> to
>> > a non-empty static byte array such as `new byte[1]` or `new byte[]{1}`.
>> >
>> >
>> >
>> > Hope this gives a little more clarity and hope to hear from you soon.
>> >
>> >
>> >
>> > Thanks,
>> >
>> > Upesh
>> >
>> >
>> > Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
>> > <ud...@itrsgroup.com>
>> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
>> > <https://www.itrsgroup.com/>
>> >
>> > *From: *Guozhang Wang <wa...@gmail.com>
>> > *Date: *Wednesday, March 24, 2021 at 1:37 PM
>> > *To: *Users <us...@kafka.apache.org>
>> > *Cc: *Bart Lilje <bl...@itrsgroup.com>
>> > *Subject: *Re: Kafka Streams Processor API state stores not restored via
>> > changelog topics
>> >
>> > Hello Upesh,
>> >
>> > Thanks for the detailed report. I looked through the code and tried to
>> > reproduce the issue, but so far have not succeeded. I think I may need
>> some
>> > further information from you to help my further investigation.
>> >
>> > 1) The `bufferedLimitIndex == 0` itself does not necessarily mean
>> there's
>> > an issue, as long as it could still be bumped later (i.e. it is possible
>> > that the restore consumer has not fetched data yet). What's key though,
>> is
>> > to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it
>> would
>> > be created with null value, and then been initialized once.
>> ChangelogReader
>> > would stop restoring once the current offset has reached beyond this
>> value
>> > or if this value itself is 0.
>> >
>> > 2) If `restoreEndOffset` is initialized to a non-zero value, then check
>> if
>> > the restoration indeed completed without applying any records, this is
>> > determined as `hasRestoredToEnd()` returning true.
>> >
>> > 3) If `restoreEndOffset` is initialized to 0, then we need to check
>> why: on
>> > top of my head I can only think of that the consumer's end offset
>> request
>> > gets the response with 0, indicating the changelog is now empty.
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <ud...@itrsgroup.com>
>> wrote:
>> >
>> > > Hi all,
>> > >
>> > >
>> > >
>> > > Our team think we discovered a bug over the weekend withing the Kafka
>> > > Streams / Processor API. We are running 2.7.0.
>> > >
>> > >
>> > >
>> > > When configuring a state store backed by a changelog topic with the
>> > > cleanup policy configuration set to “compact,delete”:
>> > >
>> > >
>> > >
>> > > final StoreBuilder<KeyValueStore<k,v>> store = Stores
>> > >   .*keyValueStoreBuilder*(
>> > >     Stores.*persistentKeyValueStore*(*STORE_ID*),
>> > >     kSerde,
>> > >     vSerde)
>> > >   .withLoggingEnabled(Map.*of*(
>> > >     *RETENTION_MS_CONFIG*, "90000000"),
>> > >     *CLEANUP_POLICY_CONFIG*, "compact,delete"))
>> > >   .withCachingEnabled();
>> > >
>> > >
>> > >
>> > > Here is how we reproduced the problem:
>> > >
>> > >    1. Records are written to the state store, and subsequently
>> produced
>> > >    to the changelog topic.
>> > >    2. Store streams application
>> > >    3. Delete state.dir directory
>> > >    4. Restart streams application
>> > >    5. Confirm state store is initialized empty with no records
>> restored
>> > >    from changelog
>> > >
>> > >
>> > >
>> > > We see this problem with both in-memory and RocksDB backed state
>> stores.
>> > > For persistent state store, if the streams application is restarted
>> > without
>> > > the state dir being deleted, the application still does not “restore”
>> > from
>> > > the changelog, but records are still seen in the state store.
>> > >
>> > >
>> > >
>> > > When rolling back to 2.6, we do not see this issue.
>> > >
>> > >
>> > >
>> > > Doing some debugging in the source code, in the StoreChangelogReader
>> > class
>> > > I found that the number of records to restore is always 0 based on the
>> > > below snippet:
>> > >
>> > >
>> > >
>> > > private void restoreChangelog(final ChangelogMetadata
>> changelogMetadata)
>> > {
>> > >     final ProcessorStateManager stateManager =
>> > changelogMetadata.stateManager;
>> > >     final StateStoreMetadata storeMetadata =
>> > changelogMetadata.storeMetadata;
>> > >     final TopicPartition partition =
>> storeMetadata.changelogPartition();
>> > >     final String storeName = storeMetadata.store().name();
>> > >     final int numRecords = changelogMetadata.bufferedLimitIndex;
>> > >
>> > >
>> > >
>> > > Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
>> > >
>> > >
>> > >
>> > > My question to you all is, 1) Is this expected behavior? 2) If not,
>> is it
>> > > a bug?
>> > >
>> > >
>> > >
>> > > Hope to get some clarity, and thanks in advance!
>> > >
>> > >
>> > >
>> > > Best,
>> > > Upesh
>> > > <https://www.itrsgroup.com/>
>> > > Upesh Desai​
>> > > Senior Software Developer
>> > > *udesai@itrsgroup.com* <ud...@itrsgroup.com>
>> > > *www.itrsgroup.com* <https://www.itrsgroup.com/>
>> > > Internet communications are not secure and therefore the ITRS Group
>> does
>> > > not accept legal responsibility for the contents of this message. Any
>> > view
>> > > or opinions presented are solely those of the author and do not
>> > necessarily
>> > > represent those of the ITRS Group unless otherwise specifically
>> stated.
>> > > [itrs.email.signature]
>> > >
>> > >
>> > > *Disclaimer*
>> > >
>> > > The information contained in this communication from the sender is
>> > > confidential. It is intended solely for use by the recipient and
>> others
>> > > authorized to receive it. If you are not the recipient, you are hereby
>> > > notified that any disclosure, copying, distribution or taking action
>> in
>> > > relation of the contents of this information is strictly prohibited
>> and
>> > may
>> > > be unlawful.
>> > >
>> > > This email has been scanned for viruses and malware, and may have been
>> > > automatically archived by *Mimecast Ltd*, an innovator in Software as
>> a
>> > > Service (SaaS) for business. Providing a *safer* and *more useful*
>> place
>> > > for your human generated data. Specializing in; Security, archiving
>> and
>> > > compliance.
>> > >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>> >
>> > *Disclaimer*
>> >
>> > The information contained in this communication from the sender is
>> > confidential. It is intended solely for use by the recipient and others
>> > authorized to receive it. If you are not the recipient, you are hereby
>> > notified that any disclosure, copying, distribution or taking action in
>> > relation of the contents of this information is strictly prohibited and
>> may
>> > be unlawful.
>> >
>> > This email has been scanned for viruses and malware, and may have been
>> > automatically archived by *Mimecast Ltd*, an innovator in Software as a
>> > Service (SaaS) for business. Providing a *safer* and *more useful* place
>> > for your human generated data. Specializing in; Security, archiving and
>> > compliance.
>> >
>>
>>
>> --
>> -- Guozhang
>>
>>
>> *Disclaimer*
>>
>> The information contained in this communication from the sender is
>> confidential. It is intended solely for use by the recipient and others
>> authorized to receive it. If you are not the recipient, you are hereby
>> notified that any disclosure, copying, distribution or taking action in
>> relation of the contents of this information is strictly prohibited and may
>> be unlawful.
>>
>> This email has been scanned for viruses and malware, and may have been
>> automatically archived by *Mimecast Ltd*, an innovator in Software as a
>> Service (SaaS) for business. Providing a *safer* and *more useful* place
>> for your human generated data. Specializing in; Security, archiving and
>> compliance.
>>
>
>
> --
> -- Guozhang
>


--
-- Guozhang

Disclaimer

The information contained in this communication from the sender is confidential. It is intended solely for use by the recipient and others authorized to receive it. If you are not the recipient, you are hereby notified that any disclosure, copying, distribution or taking action in relation of the contents of this information is strictly prohibited and may be unlawful.

This email has been scanned for viruses and malware, and may have been automatically archived by Mimecast Ltd, an innovator in Software as a Service (SaaS) for business. Providing a safer and more useful place for your human generated data. Specializing in; Security, archiving and compliance. To find out more visit the Mimecast website.

Re: Kafka Streams Processor API state stores not restored via changelog topics

Posted by Guozhang Wang <wa...@gmail.com>.
BTW, yes that indicates the record in the changelog was already truncated
(logically). But since we only physically truncate logs by segments, which
is 1GB by default, it should still be physically on the log. Are you
enabling EOS on Streams, and when you shutdown the streams app, is that a
clean shutdown?

On Thu, Mar 25, 2021 at 4:22 PM Guozhang Wang <wa...@gmail.com> wrote:

> That's indeed weird.
>
> Have you tried to run Kafka brokers with 2.6 while Kafka Streams client
> with 2.7?
>
> On Thu, Mar 25, 2021 at 2:34 PM Upesh Desai <ud...@itrsgroup.com> wrote:
>
>> Hello Guozhang,
>>
>>
>>
>> I have tried your suggestions with an inMemoryStore FYI and seen the
>> following:
>>
>>
>>
>>    1. I have the record added to the state store, stopped the
>>    application, and check the earliest and latest offsets via the command line
>>    tools. This shows that the earliest offset is 1, and the latest offset is
>>    also 1. Does this mean that the record has been marked for deletion
>>    already? My retention.ms config is set to 3 days (259200000 ms), so
>>    it should not be marked for deletion if added a couple minutes prior?
>>    2. Following the above, this makes sense as well. When logging the
>>    starting offset, it is not 0, but rather 1:
>>
>>    *topic: streamapp-teststore-changelog, partition: 4, start offset: 1,
>>    end offset: 1*
>>
>>
>>
>> I also confirmed different behavior when we change the changelog topic
>> cleanup policy from “*compact,delete”* to just “*compact”*. We DO NOT
>> see this issue when the changelog is just set to compact. We also confirmed
>> that this does not happen when we run everything on Kafka version 2.6.
>>
>>
>>
>> Thanks,
>>
>> Upesh
>>
>>
>> Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
>> <ud...@itrsgroup.com>
>> *www.itrsgroup.com* <https://www.itrsgroup.com/>
>> <https://www.itrsgroup.com/>
>>
>> *From: *Guozhang Wang <wa...@gmail.com>
>> *Date: *Thursday, March 25, 2021 at 4:01 PM
>> *To: *Users <us...@kafka.apache.org>
>> *Cc: *Bart Lilje <bl...@itrsgroup.com>
>> *Subject: *Re: Kafka Streams Processor API state stores not restored via
>> changelog topics
>>
>> Hello Upesh,
>>
>> Could you confirm a few more things for me:
>>
>> 1. After you stopped the application, and wiped out the state dir; check
>> if
>> the corresponding changelog topic has one record indeed at offset 0 ---
>> this can be done via the admin#listOffsets (get the earliest and latest
>> offset, which should be 0 and 1 correspondingly).
>> 2. After you resumed the application, check from which starting position
>> we
>> are restoring the changelog --- this can be done via implementing the
>> `stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
>> restoreEndOffset);`, should be 0
>>
>> If both of them check out fine as expected, then from the code I think
>> bufferedLimitIndex should be updated to 1.
>>
>>
>> Guozhang
>>
>> On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai <ud...@itrsgroup.com> wrote:
>>
>> > Hi Guozhang,
>> >
>> >
>> >
>> > Here are some of the answers to your questions I see during my testing:
>> >
>> >
>> >
>> >    1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in
>> my
>> >    test 1 record had been added to the store. However the numRecords
>> variable
>> >    is still set to 0
>> >    2. For that particular test, `hasRestoredToEnd()` indeed returns true
>> >    as well. But it is confusing since the store is actually empty / that
>> >    record I added does not exist in the store when trying to check for
>> it.
>> >    3. N/A
>> >
>> >
>> >
>> > A little more information, the records we add to this store/changelog
>> are
>> > of type <CustomKey,byte[]> where the value is always set to an empty
>> byte
>> > array `new byte[0]`. A couple other variations I have tried are setting
>> to
>> > a non-empty static byte array such as `new byte[1]` or `new byte[]{1}`.
>> >
>> >
>> >
>> > Hope this gives a little more clarity and hope to hear from you soon.
>> >
>> >
>> >
>> > Thanks,
>> >
>> > Upesh
>> >
>> >
>> > Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
>> > <ud...@itrsgroup.com>
>> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
>> > <https://www.itrsgroup.com/>
>> >
>> > *From: *Guozhang Wang <wa...@gmail.com>
>> > *Date: *Wednesday, March 24, 2021 at 1:37 PM
>> > *To: *Users <us...@kafka.apache.org>
>> > *Cc: *Bart Lilje <bl...@itrsgroup.com>
>> > *Subject: *Re: Kafka Streams Processor API state stores not restored via
>> > changelog topics
>> >
>> > Hello Upesh,
>> >
>> > Thanks for the detailed report. I looked through the code and tried to
>> > reproduce the issue, but so far have not succeeded. I think I may need
>> some
>> > further information from you to help my further investigation.
>> >
>> > 1) The `bufferedLimitIndex == 0` itself does not necessarily mean
>> there's
>> > an issue, as long as it could still be bumped later (i.e. it is possible
>> > that the restore consumer has not fetched data yet). What's key though,
>> is
>> > to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it
>> would
>> > be created with null value, and then been initialized once.
>> ChangelogReader
>> > would stop restoring once the current offset has reached beyond this
>> value
>> > or if this value itself is 0.
>> >
>> > 2) If `restoreEndOffset` is initialized to a non-zero value, then check
>> if
>> > the restoration indeed completed without applying any records, this is
>> > determined as `hasRestoredToEnd()` returning true.
>> >
>> > 3) If `restoreEndOffset` is initialized to 0, then we need to check
>> why: on
>> > top of my head I can only think of that the consumer's end offset
>> request
>> > gets the response with 0, indicating the changelog is now empty.
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <ud...@itrsgroup.com>
>> wrote:
>> >
>> > > Hi all,
>> > >
>> > >
>> > >
>> > > Our team think we discovered a bug over the weekend withing the Kafka
>> > > Streams / Processor API. We are running 2.7.0.
>> > >
>> > >
>> > >
>> > > When configuring a state store backed by a changelog topic with the
>> > > cleanup policy configuration set to “compact,delete”:
>> > >
>> > >
>> > >
>> > > final StoreBuilder<KeyValueStore<k,v>> store = Stores
>> > >   .*keyValueStoreBuilder*(
>> > >     Stores.*persistentKeyValueStore*(*STORE_ID*),
>> > >     kSerde,
>> > >     vSerde)
>> > >   .withLoggingEnabled(Map.*of*(
>> > >     *RETENTION_MS_CONFIG*, "90000000"),
>> > >     *CLEANUP_POLICY_CONFIG*, "compact,delete"))
>> > >   .withCachingEnabled();
>> > >
>> > >
>> > >
>> > > Here is how we reproduced the problem:
>> > >
>> > >    1. Records are written to the state store, and subsequently
>> produced
>> > >    to the changelog topic.
>> > >    2. Store streams application
>> > >    3. Delete state.dir directory
>> > >    4. Restart streams application
>> > >    5. Confirm state store is initialized empty with no records
>> restored
>> > >    from changelog
>> > >
>> > >
>> > >
>> > > We see this problem with both in-memory and RocksDB backed state
>> stores.
>> > > For persistent state store, if the streams application is restarted
>> > without
>> > > the state dir being deleted, the application still does not “restore”
>> > from
>> > > the changelog, but records are still seen in the state store.
>> > >
>> > >
>> > >
>> > > When rolling back to 2.6, we do not see this issue.
>> > >
>> > >
>> > >
>> > > Doing some debugging in the source code, in the StoreChangelogReader
>> > class
>> > > I found that the number of records to restore is always 0 based on the
>> > > below snippet:
>> > >
>> > >
>> > >
>> > > private void restoreChangelog(final ChangelogMetadata
>> changelogMetadata)
>> > {
>> > >     final ProcessorStateManager stateManager =
>> > changelogMetadata.stateManager;
>> > >     final StateStoreMetadata storeMetadata =
>> > changelogMetadata.storeMetadata;
>> > >     final TopicPartition partition =
>> storeMetadata.changelogPartition();
>> > >     final String storeName = storeMetadata.store().name();
>> > >     final int numRecords = changelogMetadata.bufferedLimitIndex;
>> > >
>> > >
>> > >
>> > > Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
>> > >
>> > >
>> > >
>> > > My question to you all is, 1) Is this expected behavior? 2) If not,
>> is it
>> > > a bug?
>> > >
>> > >
>> > >
>> > > Hope to get some clarity, and thanks in advance!
>> > >
>> > >
>> > >
>> > > Best,
>> > > Upesh
>> > > <https://www.itrsgroup.com/>
>> > > Upesh Desai​
>> > > Senior Software Developer
>> > > *udesai@itrsgroup.com* <ud...@itrsgroup.com>
>> > > *www.itrsgroup.com* <https://www.itrsgroup.com/>
>> > > Internet communications are not secure and therefore the ITRS Group
>> does
>> > > not accept legal responsibility for the contents of this message. Any
>> > view
>> > > or opinions presented are solely those of the author and do not
>> > necessarily
>> > > represent those of the ITRS Group unless otherwise specifically
>> stated.
>> > > [itrs.email.signature]
>> > >
>> > >
>> > > *Disclaimer*
>> > >
>> > > The information contained in this communication from the sender is
>> > > confidential. It is intended solely for use by the recipient and
>> others
>> > > authorized to receive it. If you are not the recipient, you are hereby
>> > > notified that any disclosure, copying, distribution or taking action
>> in
>> > > relation of the contents of this information is strictly prohibited
>> and
>> > may
>> > > be unlawful.
>> > >
>> > > This email has been scanned for viruses and malware, and may have been
>> > > automatically archived by *Mimecast Ltd*, an innovator in Software as
>> a
>> > > Service (SaaS) for business. Providing a *safer* and *more useful*
>> place
>> > > for your human generated data. Specializing in; Security, archiving
>> and
>> > > compliance.
>> > >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>> >
>> > *Disclaimer*
>> >
>> > The information contained in this communication from the sender is
>> > confidential. It is intended solely for use by the recipient and others
>> > authorized to receive it. If you are not the recipient, you are hereby
>> > notified that any disclosure, copying, distribution or taking action in
>> > relation of the contents of this information is strictly prohibited and
>> may
>> > be unlawful.
>> >
>> > This email has been scanned for viruses and malware, and may have been
>> > automatically archived by *Mimecast Ltd*, an innovator in Software as a
>> > Service (SaaS) for business. Providing a *safer* and *more useful* place
>> > for your human generated data. Specializing in; Security, archiving and
>> > compliance.
>> >
>>
>>
>> --
>> -- Guozhang
>>
>>
>> *Disclaimer*
>>
>> The information contained in this communication from the sender is
>> confidential. It is intended solely for use by the recipient and others
>> authorized to receive it. If you are not the recipient, you are hereby
>> notified that any disclosure, copying, distribution or taking action in
>> relation of the contents of this information is strictly prohibited and may
>> be unlawful.
>>
>> This email has been scanned for viruses and malware, and may have been
>> automatically archived by *Mimecast Ltd*, an innovator in Software as a
>> Service (SaaS) for business. Providing a *safer* and *more useful* place
>> for your human generated data. Specializing in; Security, archiving and
>> compliance.
>>
>
>
> --
> -- Guozhang
>


-- 
-- Guozhang

Re: Kafka Streams Processor API state stores not restored via changelog topics

Posted by Guozhang Wang <wa...@gmail.com>.
That's indeed weird.

Have you tried to run Kafka brokers with 2.6 while Kafka Streams client
with 2.7?

On Thu, Mar 25, 2021 at 2:34 PM Upesh Desai <ud...@itrsgroup.com> wrote:

> Hello Guozhang,
>
>
>
> I have tried your suggestions with an inMemoryStore FYI and seen the
> following:
>
>
>
>    1. I have the record added to the state store, stopped the
>    application, and check the earliest and latest offsets via the command line
>    tools. This shows that the earliest offset is 1, and the latest offset is
>    also 1. Does this mean that the record has been marked for deletion
>    already? My retention.ms config is set to 3 days (259200000 ms), so it
>    should not be marked for deletion if added a couple minutes prior?
>    2. Following the above, this makes sense as well. When logging the
>    starting offset, it is not 0, but rather 1:
>
>    *topic: streamapp-teststore-changelog, partition: 4, start offset: 1,
>    end offset: 1*
>
>
>
> I also confirmed different behavior when we change the changelog topic
> cleanup policy from “*compact,delete”* to just “*compact”*. We DO NOT see
> this issue when the changelog is just set to compact. We also confirmed
> that this does not happen when we run everything on Kafka version 2.6.
>
>
>
> Thanks,
>
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
> <ud...@itrsgroup.com>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang <wa...@gmail.com>
> *Date: *Thursday, March 25, 2021 at 4:01 PM
> *To: *Users <us...@kafka.apache.org>
> *Cc: *Bart Lilje <bl...@itrsgroup.com>
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> Hello Upesh,
>
> Could you confirm a few more things for me:
>
> 1. After you stopped the application, and wiped out the state dir; check if
> the corresponding changelog topic has one record indeed at offset 0 ---
> this can be done via the admin#listOffsets (get the earliest and latest
> offset, which should be 0 and 1 correspondingly).
> 2. After you resumed the application, check from which starting position we
> are restoring the changelog --- this can be done via implementing the
> `stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
> restoreEndOffset);`, should be 0
>
> If both of them check out fine as expected, then from the code I think
> bufferedLimitIndex should be updated to 1.
>
>
> Guozhang
>
> On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai <ud...@itrsgroup.com> wrote:
>
> > Hi Guozhang,
> >
> >
> >
> > Here are some of the answers to your questions I see during my testing:
> >
> >
> >
> >    1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in my
> >    test 1 record had been added to the store. However the numRecords
> variable
> >    is still set to 0
> >    2. For that particular test, `hasRestoredToEnd()` indeed returns true
> >    as well. But it is confusing since the store is actually empty / that
> >    record I added does not exist in the store when trying to check for
> it.
> >    3. N/A
> >
> >
> >
> > A little more information, the records we add to this store/changelog are
> > of type <CustomKey,byte[]> where the value is always set to an empty byte
> > array `new byte[0]`. A couple other variations I have tried are setting
> to
> > a non-empty static byte array such as `new byte[1]` or `new byte[]{1}`.
> >
> >
> >
> > Hope this gives a little more clarity and hope to hear from you soon.
> >
> >
> >
> > Thanks,
> >
> > Upesh
> >
> >
> > Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
> > <ud...@itrsgroup.com>
> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > <https://www.itrsgroup.com/>
> >
> > *From: *Guozhang Wang <wa...@gmail.com>
> > *Date: *Wednesday, March 24, 2021 at 1:37 PM
> > *To: *Users <us...@kafka.apache.org>
> > *Cc: *Bart Lilje <bl...@itrsgroup.com>
> > *Subject: *Re: Kafka Streams Processor API state stores not restored via
> > changelog topics
> >
> > Hello Upesh,
> >
> > Thanks for the detailed report. I looked through the code and tried to
> > reproduce the issue, but so far have not succeeded. I think I may need
> some
> > further information from you to help my further investigation.
> >
> > 1) The `bufferedLimitIndex == 0` itself does not necessarily mean there's
> > an issue, as long as it could still be bumped later (i.e. it is possible
> > that the restore consumer has not fetched data yet). What's key though,
> is
> > to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it would
> > be created with null value, and then been initialized once.
> ChangelogReader
> > would stop restoring once the current offset has reached beyond this
> value
> > or if this value itself is 0.
> >
> > 2) If `restoreEndOffset` is initialized to a non-zero value, then check
> if
> > the restoration indeed completed without applying any records, this is
> > determined as `hasRestoredToEnd()` returning true.
> >
> > 3) If `restoreEndOffset` is initialized to 0, then we need to check why:
> on
> > top of my head I can only think of that the consumer's end offset request
> > gets the response with 0, indicating the changelog is now empty.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <ud...@itrsgroup.com>
> wrote:
> >
> > > Hi all,
> > >
> > >
> > >
> > > Our team think we discovered a bug over the weekend withing the Kafka
> > > Streams / Processor API. We are running 2.7.0.
> > >
> > >
> > >
> > > When configuring a state store backed by a changelog topic with the
> > > cleanup policy configuration set to “compact,delete”:
> > >
> > >
> > >
> > > final StoreBuilder<KeyValueStore<k,v>> store = Stores
> > >   .*keyValueStoreBuilder*(
> > >     Stores.*persistentKeyValueStore*(*STORE_ID*),
> > >     kSerde,
> > >     vSerde)
> > >   .withLoggingEnabled(Map.*of*(
> > >     *RETENTION_MS_CONFIG*, "90000000"),
> > >     *CLEANUP_POLICY_CONFIG*, "compact,delete"))
> > >   .withCachingEnabled();
> > >
> > >
> > >
> > > Here is how we reproduced the problem:
> > >
> > >    1. Records are written to the state store, and subsequently produced
> > >    to the changelog topic.
> > >    2. Store streams application
> > >    3. Delete state.dir directory
> > >    4. Restart streams application
> > >    5. Confirm state store is initialized empty with no records restored
> > >    from changelog
> > >
> > >
> > >
> > > We see this problem with both in-memory and RocksDB backed state
> stores.
> > > For persistent state store, if the streams application is restarted
> > without
> > > the state dir being deleted, the application still does not “restore”
> > from
> > > the changelog, but records are still seen in the state store.
> > >
> > >
> > >
> > > When rolling back to 2.6, we do not see this issue.
> > >
> > >
> > >
> > > Doing some debugging in the source code, in the StoreChangelogReader
> > class
> > > I found that the number of records to restore is always 0 based on the
> > > below snippet:
> > >
> > >
> > >
> > > private void restoreChangelog(final ChangelogMetadata
> changelogMetadata)
> > {
> > >     final ProcessorStateManager stateManager =
> > changelogMetadata.stateManager;
> > >     final StateStoreMetadata storeMetadata =
> > changelogMetadata.storeMetadata;
> > >     final TopicPartition partition =
> storeMetadata.changelogPartition();
> > >     final String storeName = storeMetadata.store().name();
> > >     final int numRecords = changelogMetadata.bufferedLimitIndex;
> > >
> > >
> > >
> > > Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
> > >
> > >
> > >
> > > My question to you all is, 1) Is this expected behavior? 2) If not, is
> it
> > > a bug?
> > >
> > >
> > >
> > > Hope to get some clarity, and thanks in advance!
> > >
> > >
> > >
> > > Best,
> > > Upesh
> > > <https://www.itrsgroup.com/>
> > > Upesh Desai​
> > > Senior Software Developer
> > > *udesai@itrsgroup.com* <ud...@itrsgroup.com>
> > > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > > Internet communications are not secure and therefore the ITRS Group
> does
> > > not accept legal responsibility for the contents of this message. Any
> > view
> > > or opinions presented are solely those of the author and do not
> > necessarily
> > > represent those of the ITRS Group unless otherwise specifically stated.
> > > [itrs.email.signature]
> > >
> > >
> > > *Disclaimer*
> > >
> > > The information contained in this communication from the sender is
> > > confidential. It is intended solely for use by the recipient and others
> > > authorized to receive it. If you are not the recipient, you are hereby
> > > notified that any disclosure, copying, distribution or taking action in
> > > relation of the contents of this information is strictly prohibited and
> > may
> > > be unlawful.
> > >
> > > This email has been scanned for viruses and malware, and may have been
> > > automatically archived by *Mimecast Ltd*, an innovator in Software as a
> > > Service (SaaS) for business. Providing a *safer* and *more useful*
> place
> > > for your human generated data. Specializing in; Security, archiving and
> > > compliance.
> > >
> >
> >
> > --
> > -- Guozhang
> >
> >
> > *Disclaimer*
> >
> > The information contained in this communication from the sender is
> > confidential. It is intended solely for use by the recipient and others
> > authorized to receive it. If you are not the recipient, you are hereby
> > notified that any disclosure, copying, distribution or taking action in
> > relation of the contents of this information is strictly prohibited and
> may
> > be unlawful.
> >
> > This email has been scanned for viruses and malware, and may have been
> > automatically archived by *Mimecast Ltd*, an innovator in Software as a
> > Service (SaaS) for business. Providing a *safer* and *more useful* place
> > for your human generated data. Specializing in; Security, archiving and
> > compliance.
> >
>
>
> --
> -- Guozhang
>
>
> *Disclaimer*
>
> The information contained in this communication from the sender is
> confidential. It is intended solely for use by the recipient and others
> authorized to receive it. If you are not the recipient, you are hereby
> notified that any disclosure, copying, distribution or taking action in
> relation of the contents of this information is strictly prohibited and may
> be unlawful.
>
> This email has been scanned for viruses and malware, and may have been
> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> Service (SaaS) for business. Providing a *safer* and *more useful* place
> for your human generated data. Specializing in; Security, archiving and
> compliance.
>


-- 
-- Guozhang

Re: Kafka Streams Processor API state stores not restored via changelog topics

Posted by Upesh Desai <ud...@itrsgroup.com>.
Hello Guozhang,

I have tried your suggestions with an inMemoryStore FYI and seen the following:


  1.  I have the record added to the state store, stopped the application, and check the earliest and latest offsets via the command line tools. This shows that the earliest offset is 1, and the latest offset is also 1. Does this mean that the record has been marked for deletion already? My retention.ms config is set to 3 days (259200000 ms), so it should not be marked for deletion if added a couple minutes prior?
  2.  Following the above, this makes sense as well. When logging the starting offset, it is not 0, but rather 1:

topic: streamapp-teststore-changelog, partition: 4, start offset: 1, end offset: 1

I also confirmed different behavior when we change the changelog topic cleanup policy from “compact,delete” to just “compact”. We DO NOT see this issue when the changelog is just set to compact. We also confirmed that this does not happen when we run everything on Kafka version 2.6.

Thanks,
Upesh


Upesh Desai | Senior Software Developer | udesai@itrsgroup.com
www.itrsgroup.com
From: Guozhang Wang <wa...@gmail.com>
Date: Thursday, March 25, 2021 at 4:01 PM
To: Users <us...@kafka.apache.org>
Cc: Bart Lilje <bl...@itrsgroup.com>
Subject: Re: Kafka Streams Processor API state stores not restored via changelog topics
Hello Upesh,

Could you confirm a few more things for me:

1. After you stopped the application, and wiped out the state dir; check if
the corresponding changelog topic has one record indeed at offset 0 ---
this can be done via the admin#listOffsets (get the earliest and latest
offset, which should be 0 and 1 correspondingly).
2. After you resumed the application, check from which starting position we
are restoring the changelog --- this can be done via implementing the
`stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
restoreEndOffset);`, should be 0

If both of them check out fine as expected, then from the code I think
bufferedLimitIndex should be updated to 1.


Guozhang

On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai <ud...@itrsgroup.com> wrote:

> Hi Guozhang,
>
>
>
> Here are some of the answers to your questions I see during my testing:
>
>
>
>    1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in my
>    test 1 record had been added to the store. However the numRecords variable
>    is still set to 0
>    2. For that particular test, `hasRestoredToEnd()` indeed returns true
>    as well. But it is confusing since the store is actually empty / that
>    record I added does not exist in the store when trying to check for it.
>    3. N/A
>
>
>
> A little more information, the records we add to this store/changelog are
> of type <CustomKey,byte[]> where the value is always set to an empty byte
> array `new byte[0]`. A couple other variations I have tried are setting to
> a non-empty static byte array such as `new byte[1]` or `new byte[]{1}`.
>
>
>
> Hope this gives a little more clarity and hope to hear from you soon.
>
>
>
> Thanks,
>
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
> <ud...@itrsgroup.com>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang <wa...@gmail.com>
> *Date: *Wednesday, March 24, 2021 at 1:37 PM
> *To: *Users <us...@kafka.apache.org>
> *Cc: *Bart Lilje <bl...@itrsgroup.com>
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> Hello Upesh,
>
> Thanks for the detailed report. I looked through the code and tried to
> reproduce the issue, but so far have not succeeded. I think I may need some
> further information from you to help my further investigation.
>
> 1) The `bufferedLimitIndex == 0` itself does not necessarily mean there's
> an issue, as long as it could still be bumped later (i.e. it is possible
> that the restore consumer has not fetched data yet). What's key though, is
> to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it would
> be created with null value, and then been initialized once. ChangelogReader
> would stop restoring once the current offset has reached beyond this value
> or if this value itself is 0.
>
> 2) If `restoreEndOffset` is initialized to a non-zero value, then check if
> the restoration indeed completed without applying any records, this is
> determined as `hasRestoredToEnd()` returning true.
>
> 3) If `restoreEndOffset` is initialized to 0, then we need to check why: on
> top of my head I can only think of that the consumer's end offset request
> gets the response with 0, indicating the changelog is now empty.
>
>
> Guozhang
>
>
> On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <ud...@itrsgroup.com> wrote:
>
> > Hi all,
> >
> >
> >
> > Our team think we discovered a bug over the weekend withing the Kafka
> > Streams / Processor API. We are running 2.7.0.
> >
> >
> >
> > When configuring a state store backed by a changelog topic with the
> > cleanup policy configuration set to “compact,delete”:
> >
> >
> >
> > final StoreBuilder<KeyValueStore<k,v>> store = Stores
> >   .*keyValueStoreBuilder*(
> >     Stores.*persistentKeyValueStore*(*STORE_ID*),
> >     kSerde,
> >     vSerde)
> >   .withLoggingEnabled(Map.*of*(
> >     *RETENTION_MS_CONFIG*, "90000000"),
> >     *CLEANUP_POLICY_CONFIG*, "compact,delete"))
> >   .withCachingEnabled();
> >
> >
> >
> > Here is how we reproduced the problem:
> >
> >    1. Records are written to the state store, and subsequently produced
> >    to the changelog topic.
> >    2. Store streams application
> >    3. Delete state.dir directory
> >    4. Restart streams application
> >    5. Confirm state store is initialized empty with no records restored
> >    from changelog
> >
> >
> >
> > We see this problem with both in-memory and RocksDB backed state stores.
> > For persistent state store, if the streams application is restarted
> without
> > the state dir being deleted, the application still does not “restore”
> from
> > the changelog, but records are still seen in the state store.
> >
> >
> >
> > When rolling back to 2.6, we do not see this issue.
> >
> >
> >
> > Doing some debugging in the source code, in the StoreChangelogReader
> class
> > I found that the number of records to restore is always 0 based on the
> > below snippet:
> >
> >
> >
> > private void restoreChangelog(final ChangelogMetadata changelogMetadata)
> {
> >     final ProcessorStateManager stateManager =
> changelogMetadata.stateManager;
> >     final StateStoreMetadata storeMetadata =
> changelogMetadata.storeMetadata;
> >     final TopicPartition partition = storeMetadata.changelogPartition();
> >     final String storeName = storeMetadata.store().name();
> >     final int numRecords = changelogMetadata.bufferedLimitIndex;
> >
> >
> >
> > Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
> >
> >
> >
> > My question to you all is, 1) Is this expected behavior? 2) If not, is it
> > a bug?
> >
> >
> >
> > Hope to get some clarity, and thanks in advance!
> >
> >
> >
> > Best,
> > Upesh
> > <https://www.itrsgroup.com/>
> > Upesh Desai​
> > Senior Software Developer
> > *udesai@itrsgroup.com* <ud...@itrsgroup.com>
> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > Internet communications are not secure and therefore the ITRS Group does
> > not accept legal responsibility for the contents of this message. Any
> view
> > or opinions presented are solely those of the author and do not
> necessarily
> > represent those of the ITRS Group unless otherwise specifically stated.
> > [itrs.email.signature]
> >
> >
> > *Disclaimer*
> >
> > The information contained in this communication from the sender is
> > confidential. It is intended solely for use by the recipient and others
> > authorized to receive it. If you are not the recipient, you are hereby
> > notified that any disclosure, copying, distribution or taking action in
> > relation of the contents of this information is strictly prohibited and
> may
> > be unlawful.
> >
> > This email has been scanned for viruses and malware, and may have been
> > automatically archived by *Mimecast Ltd*, an innovator in Software as a
> > Service (SaaS) for business. Providing a *safer* and *more useful* place
> > for your human generated data. Specializing in; Security, archiving and
> > compliance.
> >
>
>
> --
> -- Guozhang
>
>
> *Disclaimer*
>
> The information contained in this communication from the sender is
> confidential. It is intended solely for use by the recipient and others
> authorized to receive it. If you are not the recipient, you are hereby
> notified that any disclosure, copying, distribution or taking action in
> relation of the contents of this information is strictly prohibited and may
> be unlawful.
>
> This email has been scanned for viruses and malware, and may have been
> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> Service (SaaS) for business. Providing a *safer* and *more useful* place
> for your human generated data. Specializing in; Security, archiving and
> compliance.
>


--
-- Guozhang

Disclaimer

The information contained in this communication from the sender is confidential. It is intended solely for use by the recipient and others authorized to receive it. If you are not the recipient, you are hereby notified that any disclosure, copying, distribution or taking action in relation of the contents of this information is strictly prohibited and may be unlawful.

This email has been scanned for viruses and malware, and may have been automatically archived by Mimecast Ltd, an innovator in Software as a Service (SaaS) for business. Providing a safer and more useful place for your human generated data. Specializing in; Security, archiving and compliance. To find out more visit the Mimecast website.

Re: Kafka Streams Processor API state stores not restored via changelog topics

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Upesh,

Could you confirm a few more things for me:

1. After you stopped the application, and wiped out the state dir; check if
the corresponding changelog topic has one record indeed at offset 0 ---
this can be done via the admin#listOffsets (get the earliest and latest
offset, which should be 0 and 1 correspondingly).
2. After you resumed the application, check from which starting position we
are restoring the changelog --- this can be done via implementing the
`stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
restoreEndOffset);`, should be 0

If both of them check out fine as expected, then from the code I think
bufferedLimitIndex should be updated to 1.


Guozhang

On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai <ud...@itrsgroup.com> wrote:

> Hi Guozhang,
>
>
>
> Here are some of the answers to your questions I see during my testing:
>
>
>
>    1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in my
>    test 1 record had been added to the store. However the numRecords variable
>    is still set to 0
>    2. For that particular test, `hasRestoredToEnd()` indeed returns true
>    as well. But it is confusing since the store is actually empty / that
>    record I added does not exist in the store when trying to check for it.
>    3. N/A
>
>
>
> A little more information, the records we add to this store/changelog are
> of type <CustomKey,byte[]> where the value is always set to an empty byte
> array `new byte[0]`. A couple other variations I have tried are setting to
> a non-empty static byte array such as `new byte[1]` or `new byte[]{1}`.
>
>
>
> Hope this gives a little more clarity and hope to hear from you soon.
>
>
>
> Thanks,
>
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *udesai@itrsgroup.com*
> <ud...@itrsgroup.com>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang <wa...@gmail.com>
> *Date: *Wednesday, March 24, 2021 at 1:37 PM
> *To: *Users <us...@kafka.apache.org>
> *Cc: *Bart Lilje <bl...@itrsgroup.com>
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> Hello Upesh,
>
> Thanks for the detailed report. I looked through the code and tried to
> reproduce the issue, but so far have not succeeded. I think I may need some
> further information from you to help my further investigation.
>
> 1) The `bufferedLimitIndex == 0` itself does not necessarily mean there's
> an issue, as long as it could still be bumped later (i.e. it is possible
> that the restore consumer has not fetched data yet). What's key though, is
> to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it would
> be created with null value, and then been initialized once. ChangelogReader
> would stop restoring once the current offset has reached beyond this value
> or if this value itself is 0.
>
> 2) If `restoreEndOffset` is initialized to a non-zero value, then check if
> the restoration indeed completed without applying any records, this is
> determined as `hasRestoredToEnd()` returning true.
>
> 3) If `restoreEndOffset` is initialized to 0, then we need to check why: on
> top of my head I can only think of that the consumer's end offset request
> gets the response with 0, indicating the changelog is now empty.
>
>
> Guozhang
>
>
> On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <ud...@itrsgroup.com> wrote:
>
> > Hi all,
> >
> >
> >
> > Our team think we discovered a bug over the weekend withing the Kafka
> > Streams / Processor API. We are running 2.7.0.
> >
> >
> >
> > When configuring a state store backed by a changelog topic with the
> > cleanup policy configuration set to “compact,delete”:
> >
> >
> >
> > final StoreBuilder<KeyValueStore<k,v>> store = Stores
> >   .*keyValueStoreBuilder*(
> >     Stores.*persistentKeyValueStore*(*STORE_ID*),
> >     kSerde,
> >     vSerde)
> >   .withLoggingEnabled(Map.*of*(
> >     *RETENTION_MS_CONFIG*, "90000000"),
> >     *CLEANUP_POLICY_CONFIG*, "compact,delete"))
> >   .withCachingEnabled();
> >
> >
> >
> > Here is how we reproduced the problem:
> >
> >    1. Records are written to the state store, and subsequently produced
> >    to the changelog topic.
> >    2. Store streams application
> >    3. Delete state.dir directory
> >    4. Restart streams application
> >    5. Confirm state store is initialized empty with no records restored
> >    from changelog
> >
> >
> >
> > We see this problem with both in-memory and RocksDB backed state stores.
> > For persistent state store, if the streams application is restarted
> without
> > the state dir being deleted, the application still does not “restore”
> from
> > the changelog, but records are still seen in the state store.
> >
> >
> >
> > When rolling back to 2.6, we do not see this issue.
> >
> >
> >
> > Doing some debugging in the source code, in the StoreChangelogReader
> class
> > I found that the number of records to restore is always 0 based on the
> > below snippet:
> >
> >
> >
> > private void restoreChangelog(final ChangelogMetadata changelogMetadata)
> {
> >     final ProcessorStateManager stateManager =
> changelogMetadata.stateManager;
> >     final StateStoreMetadata storeMetadata =
> changelogMetadata.storeMetadata;
> >     final TopicPartition partition = storeMetadata.changelogPartition();
> >     final String storeName = storeMetadata.store().name();
> >     final int numRecords = changelogMetadata.bufferedLimitIndex;
> >
> >
> >
> > Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
> >
> >
> >
> > My question to you all is, 1) Is this expected behavior? 2) If not, is it
> > a bug?
> >
> >
> >
> > Hope to get some clarity, and thanks in advance!
> >
> >
> >
> > Best,
> > Upesh
> > <https://www.itrsgroup.com/>
> > Upesh Desai​
> > Senior Software Developer
> > *udesai@itrsgroup.com* <ud...@itrsgroup.com>
> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > Internet communications are not secure and therefore the ITRS Group does
> > not accept legal responsibility for the contents of this message. Any
> view
> > or opinions presented are solely those of the author and do not
> necessarily
> > represent those of the ITRS Group unless otherwise specifically stated.
> > [itrs.email.signature]
> >
> >
> > *Disclaimer*
> >
> > The information contained in this communication from the sender is
> > confidential. It is intended solely for use by the recipient and others
> > authorized to receive it. If you are not the recipient, you are hereby
> > notified that any disclosure, copying, distribution or taking action in
> > relation of the contents of this information is strictly prohibited and
> may
> > be unlawful.
> >
> > This email has been scanned for viruses and malware, and may have been
> > automatically archived by *Mimecast Ltd*, an innovator in Software as a
> > Service (SaaS) for business. Providing a *safer* and *more useful* place
> > for your human generated data. Specializing in; Security, archiving and
> > compliance.
> >
>
>
> --
> -- Guozhang
>
>
> *Disclaimer*
>
> The information contained in this communication from the sender is
> confidential. It is intended solely for use by the recipient and others
> authorized to receive it. If you are not the recipient, you are hereby
> notified that any disclosure, copying, distribution or taking action in
> relation of the contents of this information is strictly prohibited and may
> be unlawful.
>
> This email has been scanned for viruses and malware, and may have been
> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> Service (SaaS) for business. Providing a *safer* and *more useful* place
> for your human generated data. Specializing in; Security, archiving and
> compliance.
>


-- 
-- Guozhang

Re: Kafka Streams Processor API state stores not restored via changelog topics

Posted by Upesh Desai <ud...@itrsgroup.com>.
Hi Guozhang,

Here are some of the answers to your questions I see during my testing:


  1.  ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in my test 1 record had been added to the store. However the numRecords variable is still set to 0
  2.  For that particular test, `hasRestoredToEnd()` indeed returns true as well. But it is confusing since the store is actually empty / that record I added does not exist in the store when trying to check for it.
  3.  N/A

A little more information, the records we add to this store/changelog are of type <CustomKey,byte[]> where the value is always set to an empty byte array `new byte[0]`. A couple other variations I have tried are setting to a non-empty static byte array such as `new byte[1]` or `new byte[]{1}`.

Hope this gives a little more clarity and hope to hear from you soon.

Thanks,
Upesh


Upesh Desai | Senior Software Developer | udesai@itrsgroup.com
www.itrsgroup.com
From: Guozhang Wang <wa...@gmail.com>
Date: Wednesday, March 24, 2021 at 1:37 PM
To: Users <us...@kafka.apache.org>
Cc: Bart Lilje <bl...@itrsgroup.com>
Subject: Re: Kafka Streams Processor API state stores not restored via changelog topics
Hello Upesh,

Thanks for the detailed report. I looked through the code and tried to
reproduce the issue, but so far have not succeeded. I think I may need some
further information from you to help my further investigation.

1) The `bufferedLimitIndex == 0` itself does not necessarily mean there's
an issue, as long as it could still be bumped later (i.e. it is possible
that the restore consumer has not fetched data yet). What's key though, is
to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it would
be created with null value, and then been initialized once. ChangelogReader
would stop restoring once the current offset has reached beyond this value
or if this value itself is 0.

2) If `restoreEndOffset` is initialized to a non-zero value, then check if
the restoration indeed completed without applying any records, this is
determined as `hasRestoredToEnd()` returning true.

3) If `restoreEndOffset` is initialized to 0, then we need to check why: on
top of my head I can only think of that the consumer's end offset request
gets the response with 0, indicating the changelog is now empty.


Guozhang


On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <ud...@itrsgroup.com> wrote:

> Hi all,
>
>
>
> Our team think we discovered a bug over the weekend withing the Kafka
> Streams / Processor API. We are running 2.7.0.
>
>
>
> When configuring a state store backed by a changelog topic with the
> cleanup policy configuration set to “compact,delete”:
>
>
>
> final StoreBuilder<KeyValueStore<k,v>> store = Stores
>   .*keyValueStoreBuilder*(
>     Stores.*persistentKeyValueStore*(*STORE_ID*),
>     kSerde,
>     vSerde)
>   .withLoggingEnabled(Map.*of*(
>     *RETENTION_MS_CONFIG*, "90000000"),
>     *CLEANUP_POLICY_CONFIG*, "compact,delete"))
>   .withCachingEnabled();
>
>
>
> Here is how we reproduced the problem:
>
>    1. Records are written to the state store, and subsequently produced
>    to the changelog topic.
>    2. Store streams application
>    3. Delete state.dir directory
>    4. Restart streams application
>    5. Confirm state store is initialized empty with no records restored
>    from changelog
>
>
>
> We see this problem with both in-memory and RocksDB backed state stores.
> For persistent state store, if the streams application is restarted without
> the state dir being deleted, the application still does not “restore” from
> the changelog, but records are still seen in the state store.
>
>
>
> When rolling back to 2.6, we do not see this issue.
>
>
>
> Doing some debugging in the source code, in the StoreChangelogReader class
> I found that the number of records to restore is always 0 based on the
> below snippet:
>
>
>
> private void restoreChangelog(final ChangelogMetadata changelogMetadata) {
>     final ProcessorStateManager stateManager = changelogMetadata.stateManager;
>     final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
>     final TopicPartition partition = storeMetadata.changelogPartition();
>     final String storeName = storeMetadata.store().name();
>     final int numRecords = changelogMetadata.bufferedLimitIndex;
>
>
>
> Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
>
>
>
> My question to you all is, 1) Is this expected behavior? 2) If not, is it
> a bug?
>
>
>
> Hope to get some clarity, and thanks in advance!
>
>
>
> Best,
> Upesh
> <https://www.itrsgroup.com/>
> Upesh Desai​
> Senior Software Developer
> *udesai@itrsgroup.com* <ud...@itrsgroup.com>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> Internet communications are not secure and therefore the ITRS Group does
> not accept legal responsibility for the contents of this message. Any view
> or opinions presented are solely those of the author and do not necessarily
> represent those of the ITRS Group unless otherwise specifically stated.
> [itrs.email.signature]
>
>
> *Disclaimer*
>
> The information contained in this communication from the sender is
> confidential. It is intended solely for use by the recipient and others
> authorized to receive it. If you are not the recipient, you are hereby
> notified that any disclosure, copying, distribution or taking action in
> relation of the contents of this information is strictly prohibited and may
> be unlawful.
>
> This email has been scanned for viruses and malware, and may have been
> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> Service (SaaS) for business. Providing a *safer* and *more useful* place
> for your human generated data. Specializing in; Security, archiving and
> compliance.
>


--
-- Guozhang

Disclaimer

The information contained in this communication from the sender is confidential. It is intended solely for use by the recipient and others authorized to receive it. If you are not the recipient, you are hereby notified that any disclosure, copying, distribution or taking action in relation of the contents of this information is strictly prohibited and may be unlawful.

This email has been scanned for viruses and malware, and may have been automatically archived by Mimecast Ltd, an innovator in Software as a Service (SaaS) for business. Providing a safer and more useful place for your human generated data. Specializing in; Security, archiving and compliance. To find out more visit the Mimecast website.

Re: Kafka Streams Processor API state stores not restored via changelog topics

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Upesh,

Thanks for the detailed report. I looked through the code and tried to
reproduce the issue, but so far have not succeeded. I think I may need some
further information from you to help my further investigation.

1) The `bufferedLimitIndex == 0` itself does not necessarily mean there's
an issue, as long as it could still be bumped later (i.e. it is possible
that the restore consumer has not fetched data yet). What's key though, is
to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it would
be created with null value, and then been initialized once. ChangelogReader
would stop restoring once the current offset has reached beyond this value
or if this value itself is 0.

2) If `restoreEndOffset` is initialized to a non-zero value, then check if
the restoration indeed completed without applying any records, this is
determined as `hasRestoredToEnd()` returning true.

3) If `restoreEndOffset` is initialized to 0, then we need to check why: on
top of my head I can only think of that the consumer's end offset request
gets the response with 0, indicating the changelog is now empty.


Guozhang


On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <ud...@itrsgroup.com> wrote:

> Hi all,
>
>
>
> Our team think we discovered a bug over the weekend withing the Kafka
> Streams / Processor API. We are running 2.7.0.
>
>
>
> When configuring a state store backed by a changelog topic with the
> cleanup policy configuration set to “compact,delete”:
>
>
>
> final StoreBuilder<KeyValueStore<k,v>> store = Stores
>   .*keyValueStoreBuilder*(
>     Stores.*persistentKeyValueStore*(*STORE_ID*),
>     kSerde,
>     vSerde)
>   .withLoggingEnabled(Map.*of*(
>     *RETENTION_MS_CONFIG*, "90000000"),
>     *CLEANUP_POLICY_CONFIG*, "compact,delete"))
>   .withCachingEnabled();
>
>
>
> Here is how we reproduced the problem:
>
>    1. Records are written to the state store, and subsequently produced
>    to the changelog topic.
>    2. Store streams application
>    3. Delete state.dir directory
>    4. Restart streams application
>    5. Confirm state store is initialized empty with no records restored
>    from changelog
>
>
>
> We see this problem with both in-memory and RocksDB backed state stores.
> For persistent state store, if the streams application is restarted without
> the state dir being deleted, the application still does not “restore” from
> the changelog, but records are still seen in the state store.
>
>
>
> When rolling back to 2.6, we do not see this issue.
>
>
>
> Doing some debugging in the source code, in the StoreChangelogReader class
> I found that the number of records to restore is always 0 based on the
> below snippet:
>
>
>
> private void restoreChangelog(final ChangelogMetadata changelogMetadata) {
>     final ProcessorStateManager stateManager = changelogMetadata.stateManager;
>     final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
>     final TopicPartition partition = storeMetadata.changelogPartition();
>     final String storeName = storeMetadata.store().name();
>     final int numRecords = changelogMetadata.bufferedLimitIndex;
>
>
>
> Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
>
>
>
> My question to you all is, 1) Is this expected behavior? 2) If not, is it
> a bug?
>
>
>
> Hope to get some clarity, and thanks in advance!
>
>
>
> Best,
> Upesh
> <https://www.itrsgroup.com/>
> Upesh Desai​
> Senior Software Developer
> *udesai@itrsgroup.com* <ud...@itrsgroup.com>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> Internet communications are not secure and therefore the ITRS Group does
> not accept legal responsibility for the contents of this message. Any view
> or opinions presented are solely those of the author and do not necessarily
> represent those of the ITRS Group unless otherwise specifically stated.
> [itrs.email.signature]
>
>
> *Disclaimer*
>
> The information contained in this communication from the sender is
> confidential. It is intended solely for use by the recipient and others
> authorized to receive it. If you are not the recipient, you are hereby
> notified that any disclosure, copying, distribution or taking action in
> relation of the contents of this information is strictly prohibited and may
> be unlawful.
>
> This email has been scanned for viruses and malware, and may have been
> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> Service (SaaS) for business. Providing a *safer* and *more useful* place
> for your human generated data. Specializing in; Security, archiving and
> compliance.
>


-- 
-- Guozhang