You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Christian Henry <ch...@gmail.com> on 2018/06/29 18:38:52 UTC

Possible bug? Duplicates when searching kafka stream state store with caching

Hi all,

I'll first describe a simplified view of relevant parts of our setup (which
should be enough to repro), describe the behavior we're seeing, and then
note some information I've come across after digging in a bit.

We have a kafka stream application, and one of our transform steps keeps a
state store to filter out messages with a previously seen GUID. That is,
our transform looks like:

public KeyValue<byte[], String> transform(byte[] key, String guid) {
    try (WindowStoreIterator<DuplicateMessageMetadata> iterator =
duplicateStore.fetch(correlationId, start, now)) {
        if (iterator.hasNext()) {
            return null;
        } else {
            duplicateStore.put(correlationId, some metadata);
            return new KeyValue<>(key, message);
        }
    }}

where the duplicateStore is a persistent windowed store with caching
enabled.

I was debugging some tests and found that sometimes when calling
*all()* or *fetchAll()
*on the duplicate store and stepping through the iterator, it would return
the same guid more than once, even if it was only inserted into the store
once. More specifically, if I had the following guids sent to the stream:
[11111, 22222, ... 99999] (for 9 values total), sometimes it would return
10 values, with one (or more) of the values being returned twice by the
iterator. However, this would not show up with a *fetch(guid)* on that
specific guid. For instance, if 11111 was being returned twice by
*fetchAll()*, calling *duplicateStore.fetch("11111", start, end)* will
still return an iterator with size of 1.

I dug into this a bit more by setting a breakpoint in
*SegmentedCacheFunction#compareSegmentedKeys(cacheKey,
storeKey)* and watching the two input values as I looped through the
iterator using "*while(iterator.hasNext()) { print(iterator.next()) }*". In
one test, the duplicate value was 66666, and saw the following behavior
(trimming off the segment values from the byte input):
-- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
-- next() returns 66666
and
-- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
-- next() returns 66666
Besides those, the input values are the same and the output is as expected.
Additionally, a coworker noted that the number of duplicates always matches
the number of times *Long.compare(cacheSegmentId, storeSegmentId) *returns
a non-zero value, indicating that duplicates are likely arising due to the
segment comparison.

Re: Possible bug? Duplicates when searching kafka stream state store with caching

Posted by Guozhang Wang <wa...@gmail.com>.
Yes, please create a JIRA reporting this: the `all()` and `fetchAll()`
source code was not modified since its first added into ReadOnlyWindowStore
API, so it's likely a lurking bug caused the issue. And please attach your
code / sample data if possible to help us reproduce this issue in order to
investigate further.



Guozhang


On Fri, Jul 6, 2018 at 7:41 AM, Christian Henry <christian.henry92@gmail.com
> wrote:

> Any other ideas here? Should I create a bug?
>
> On Tue, Jul 3, 2018 at 1:21 PM, Christian Henry <
> christian.henry92@gmail.com
> > wrote:
>
> > Nope, we're setting retainDuplicates to false.
> >
> > On Tue, Jul 3, 2018 at 6:55 AM, Damian Guy <da...@gmail.com> wrote:
> >
> >> Hi,
> >>
> >> When you create your window store do you have `retainDuplicates` set to
> >> `true`? i.e., assuming you use `Stores.persistentWindowStore(...)` is
> the
> >> last param `true`?
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Mon, 2 Jul 2018 at 17:29 Christian Henry <
> christian.henry92@gmail.com>
> >> wrote:
> >>
> >> > We're using the latest Kafka (1.1.0). I'd like to note that when we
> >> > encounter duplicates, the window is the same as well.
> >> >
> >> > My original code was a bit simplifier -- we also insert into the store
> >> if
> >> > iterator.hasNext() as well, before returning null. We're using a
> window
> >> > store because we have a punctuator that runs every few minutes to
> count
> >> > GUIDs with similar metadata, and reports that in a healthcheck. Since
> >> our
> >> > healthcheck window is less than the retention period of the store
> >> > (retention period might be 1 hour, healthcheck window is ~5 min), the
> >> > window store seemed like a good way to efficiently query all of the
> most
> >> > recent data. Note that since the healthcheck punctuator needs to
> >> aggregate
> >> > on all the recent values, it has to do a *fetchAll(start, end) *which
> is
> >> > how these duplicates are affecting us.
> >> >
> >> > On Fri, Jun 29, 2018 at 7:32 PM, Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >> >
> >> > > Hello Christian,
> >> > >
> >> > > Since you are calling fetch(key, start, end) I'm assuming that
> >> > > duplicateStore
> >> > > is a WindowedStore. With a windowed store, it is possible that a
> >> single
> >> > key
> >> > > can fall into multiple windows, and hence be returned from the
> >> > > WindowStoreIterator,
> >> > > note its type is <Windowed<K>, V>
> >> > >
> >> > > So I'd first want to know
> >> > >
> >> > > 1) which Kafka version are you using.
> >> > > 2) why you'd need a window store, and if yes, could you consider
> using
> >> > the
> >> > > single point fetch (added in KAFKA-6560) other than the range query
> >> > (which
> >> > > is more expensive as well).
> >> > >
> >> > >
> >> > >
> >> > > Guozhang
> >> > >
> >> > >
> >> > > On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry <
> >> > > christian.henry92@gmail.com> wrote:
> >> > >
> >> > > > Hi all,
> >> > > >
> >> > > > I'll first describe a simplified view of relevant parts of our
> setup
> >> > > (which
> >> > > > should be enough to repro), describe the behavior we're seeing,
> and
> >> > then
> >> > > > note some information I've come across after digging in a bit.
> >> > > >
> >> > > > We have a kafka stream application, and one of our transform steps
> >> > keeps
> >> > > a
> >> > > > state store to filter out messages with a previously seen GUID.
> That
> >> > is,
> >> > > > our transform looks like:
> >> > > >
> >> > > > public KeyValue<byte[], String> transform(byte[] key, String
> guid) {
> >> > > >     try (WindowStoreIterator<DuplicateMessageMetadata> iterator =
> >> > > > duplicateStore.fetch(correlationId, start, now)) {
> >> > > >         if (iterator.hasNext()) {
> >> > > >             return null;
> >> > > >         } else {
> >> > > >             duplicateStore.put(correlationId, some metadata);
> >> > > >             return new KeyValue<>(key, message);
> >> > > >         }
> >> > > >     }}
> >> > > >
> >> > > > where the duplicateStore is a persistent windowed store with
> caching
> >> > > > enabled.
> >> > > >
> >> > > > I was debugging some tests and found that sometimes when calling
> >> > > > *all()* or *fetchAll()
> >> > > > *on the duplicate store and stepping through the iterator, it
> would
> >> > > return
> >> > > > the same guid more than once, even if it was only inserted into
> the
> >> > store
> >> > > > once. More specifically, if I had the following guids sent to the
> >> > stream:
> >> > > > [11111, 22222, ... 99999] (for 9 values total), sometimes it would
> >> > return
> >> > > > 10 values, with one (or more) of the values being returned twice
> by
> >> the
> >> > > > iterator. However, this would not show up with a *fetch(guid)* on
> >> that
> >> > > > specific guid. For instance, if 11111 was being returned twice by
> >> > > > *fetchAll()*, calling *duplicateStore.fetch("11111", start, end)*
> >> will
> >> > > > still return an iterator with size of 1.
> >> > > >
> >> > > > I dug into this a bit more by setting a breakpoint in
> >> > > > *SegmentedCacheFunction#compareSegmentedKeys(cacheKey,
> >> > > > storeKey)* and watching the two input values as I looped through
> the
> >> > > > iterator using "*while(iterator.hasNext()) {
> print(iterator.next())
> >> > }*".
> >> > > In
> >> > > > one test, the duplicate value was 66666, and saw the following
> >> behavior
> >> > > > (trimming off the segment values from the byte input):
> >> > > > -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
> >> > > > -- next() returns 66666
> >> > > > and
> >> > > > -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
> >> > > > -- next() returns 66666
> >> > > > Besides those, the input values are the same and the output is as
> >> > > expected.
> >> > > > Additionally, a coworker noted that the number of duplicates
> always
> >> > > matches
> >> > > > the number of times *Long.compare(cacheSegmentId, storeSegmentId)
> >> > > *returns
> >> > > > a non-zero value, indicating that duplicates are likely arising
> due
> >> to
> >> > > the
> >> > > > segment comparison.
> >> > > >
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >>
> >
> >
>



-- 
-- Guozhang

Re: Possible bug? Duplicates when searching kafka stream state store with caching

Posted by Christian Henry <ch...@gmail.com>.
Any other ideas here? Should I create a bug?

On Tue, Jul 3, 2018 at 1:21 PM, Christian Henry <christian.henry92@gmail.com
> wrote:

> Nope, we're setting retainDuplicates to false.
>
> On Tue, Jul 3, 2018 at 6:55 AM, Damian Guy <da...@gmail.com> wrote:
>
>> Hi,
>>
>> When you create your window store do you have `retainDuplicates` set to
>> `true`? i.e., assuming you use `Stores.persistentWindowStore(...)` is the
>> last param `true`?
>>
>> Thanks,
>> Damian
>>
>> On Mon, 2 Jul 2018 at 17:29 Christian Henry <ch...@gmail.com>
>> wrote:
>>
>> > We're using the latest Kafka (1.1.0). I'd like to note that when we
>> > encounter duplicates, the window is the same as well.
>> >
>> > My original code was a bit simplifier -- we also insert into the store
>> if
>> > iterator.hasNext() as well, before returning null. We're using a window
>> > store because we have a punctuator that runs every few minutes to count
>> > GUIDs with similar metadata, and reports that in a healthcheck. Since
>> our
>> > healthcheck window is less than the retention period of the store
>> > (retention period might be 1 hour, healthcheck window is ~5 min), the
>> > window store seemed like a good way to efficiently query all of the most
>> > recent data. Note that since the healthcheck punctuator needs to
>> aggregate
>> > on all the recent values, it has to do a *fetchAll(start, end) *which is
>> > how these duplicates are affecting us.
>> >
>> > On Fri, Jun 29, 2018 at 7:32 PM, Guozhang Wang <wa...@gmail.com>
>> wrote:
>> >
>> > > Hello Christian,
>> > >
>> > > Since you are calling fetch(key, start, end) I'm assuming that
>> > > duplicateStore
>> > > is a WindowedStore. With a windowed store, it is possible that a
>> single
>> > key
>> > > can fall into multiple windows, and hence be returned from the
>> > > WindowStoreIterator,
>> > > note its type is <Windowed<K>, V>
>> > >
>> > > So I'd first want to know
>> > >
>> > > 1) which Kafka version are you using.
>> > > 2) why you'd need a window store, and if yes, could you consider using
>> > the
>> > > single point fetch (added in KAFKA-6560) other than the range query
>> > (which
>> > > is more expensive as well).
>> > >
>> > >
>> > >
>> > > Guozhang
>> > >
>> > >
>> > > On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry <
>> > > christian.henry92@gmail.com> wrote:
>> > >
>> > > > Hi all,
>> > > >
>> > > > I'll first describe a simplified view of relevant parts of our setup
>> > > (which
>> > > > should be enough to repro), describe the behavior we're seeing, and
>> > then
>> > > > note some information I've come across after digging in a bit.
>> > > >
>> > > > We have a kafka stream application, and one of our transform steps
>> > keeps
>> > > a
>> > > > state store to filter out messages with a previously seen GUID. That
>> > is,
>> > > > our transform looks like:
>> > > >
>> > > > public KeyValue<byte[], String> transform(byte[] key, String guid) {
>> > > >     try (WindowStoreIterator<DuplicateMessageMetadata> iterator =
>> > > > duplicateStore.fetch(correlationId, start, now)) {
>> > > >         if (iterator.hasNext()) {
>> > > >             return null;
>> > > >         } else {
>> > > >             duplicateStore.put(correlationId, some metadata);
>> > > >             return new KeyValue<>(key, message);
>> > > >         }
>> > > >     }}
>> > > >
>> > > > where the duplicateStore is a persistent windowed store with caching
>> > > > enabled.
>> > > >
>> > > > I was debugging some tests and found that sometimes when calling
>> > > > *all()* or *fetchAll()
>> > > > *on the duplicate store and stepping through the iterator, it would
>> > > return
>> > > > the same guid more than once, even if it was only inserted into the
>> > store
>> > > > once. More specifically, if I had the following guids sent to the
>> > stream:
>> > > > [11111, 22222, ... 99999] (for 9 values total), sometimes it would
>> > return
>> > > > 10 values, with one (or more) of the values being returned twice by
>> the
>> > > > iterator. However, this would not show up with a *fetch(guid)* on
>> that
>> > > > specific guid. For instance, if 11111 was being returned twice by
>> > > > *fetchAll()*, calling *duplicateStore.fetch("11111", start, end)*
>> will
>> > > > still return an iterator with size of 1.
>> > > >
>> > > > I dug into this a bit more by setting a breakpoint in
>> > > > *SegmentedCacheFunction#compareSegmentedKeys(cacheKey,
>> > > > storeKey)* and watching the two input values as I looped through the
>> > > > iterator using "*while(iterator.hasNext()) { print(iterator.next())
>> > }*".
>> > > In
>> > > > one test, the duplicate value was 66666, and saw the following
>> behavior
>> > > > (trimming off the segment values from the byte input):
>> > > > -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
>> > > > -- next() returns 66666
>> > > > and
>> > > > -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
>> > > > -- next() returns 66666
>> > > > Besides those, the input values are the same and the output is as
>> > > expected.
>> > > > Additionally, a coworker noted that the number of duplicates always
>> > > matches
>> > > > the number of times *Long.compare(cacheSegmentId, storeSegmentId)
>> > > *returns
>> > > > a non-zero value, indicating that duplicates are likely arising due
>> to
>> > > the
>> > > > segment comparison.
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>
>

Re: Possible bug? Duplicates when searching kafka stream state store with caching

Posted by Christian Henry <ch...@gmail.com>.
Nope, we're setting retainDuplicates to false.

On Tue, Jul 3, 2018 at 6:55 AM, Damian Guy <da...@gmail.com> wrote:

> Hi,
>
> When you create your window store do you have `retainDuplicates` set to
> `true`? i.e., assuming you use `Stores.persistentWindowStore(...)` is the
> last param `true`?
>
> Thanks,
> Damian
>
> On Mon, 2 Jul 2018 at 17:29 Christian Henry <ch...@gmail.com>
> wrote:
>
> > We're using the latest Kafka (1.1.0). I'd like to note that when we
> > encounter duplicates, the window is the same as well.
> >
> > My original code was a bit simplifier -- we also insert into the store if
> > iterator.hasNext() as well, before returning null. We're using a window
> > store because we have a punctuator that runs every few minutes to count
> > GUIDs with similar metadata, and reports that in a healthcheck. Since our
> > healthcheck window is less than the retention period of the store
> > (retention period might be 1 hour, healthcheck window is ~5 min), the
> > window store seemed like a good way to efficiently query all of the most
> > recent data. Note that since the healthcheck punctuator needs to
> aggregate
> > on all the recent values, it has to do a *fetchAll(start, end) *which is
> > how these duplicates are affecting us.
> >
> > On Fri, Jun 29, 2018 at 7:32 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hello Christian,
> > >
> > > Since you are calling fetch(key, start, end) I'm assuming that
> > > duplicateStore
> > > is a WindowedStore. With a windowed store, it is possible that a single
> > key
> > > can fall into multiple windows, and hence be returned from the
> > > WindowStoreIterator,
> > > note its type is <Windowed<K>, V>
> > >
> > > So I'd first want to know
> > >
> > > 1) which Kafka version are you using.
> > > 2) why you'd need a window store, and if yes, could you consider using
> > the
> > > single point fetch (added in KAFKA-6560) other than the range query
> > (which
> > > is more expensive as well).
> > >
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry <
> > > christian.henry92@gmail.com> wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'll first describe a simplified view of relevant parts of our setup
> > > (which
> > > > should be enough to repro), describe the behavior we're seeing, and
> > then
> > > > note some information I've come across after digging in a bit.
> > > >
> > > > We have a kafka stream application, and one of our transform steps
> > keeps
> > > a
> > > > state store to filter out messages with a previously seen GUID. That
> > is,
> > > > our transform looks like:
> > > >
> > > > public KeyValue<byte[], String> transform(byte[] key, String guid) {
> > > >     try (WindowStoreIterator<DuplicateMessageMetadata> iterator =
> > > > duplicateStore.fetch(correlationId, start, now)) {
> > > >         if (iterator.hasNext()) {
> > > >             return null;
> > > >         } else {
> > > >             duplicateStore.put(correlationId, some metadata);
> > > >             return new KeyValue<>(key, message);
> > > >         }
> > > >     }}
> > > >
> > > > where the duplicateStore is a persistent windowed store with caching
> > > > enabled.
> > > >
> > > > I was debugging some tests and found that sometimes when calling
> > > > *all()* or *fetchAll()
> > > > *on the duplicate store and stepping through the iterator, it would
> > > return
> > > > the same guid more than once, even if it was only inserted into the
> > store
> > > > once. More specifically, if I had the following guids sent to the
> > stream:
> > > > [11111, 22222, ... 99999] (for 9 values total), sometimes it would
> > return
> > > > 10 values, with one (or more) of the values being returned twice by
> the
> > > > iterator. However, this would not show up with a *fetch(guid)* on
> that
> > > > specific guid. For instance, if 11111 was being returned twice by
> > > > *fetchAll()*, calling *duplicateStore.fetch("11111", start, end)*
> will
> > > > still return an iterator with size of 1.
> > > >
> > > > I dug into this a bit more by setting a breakpoint in
> > > > *SegmentedCacheFunction#compareSegmentedKeys(cacheKey,
> > > > storeKey)* and watching the two input values as I looped through the
> > > > iterator using "*while(iterator.hasNext()) { print(iterator.next())
> > }*".
> > > In
> > > > one test, the duplicate value was 66666, and saw the following
> behavior
> > > > (trimming off the segment values from the byte input):
> > > > -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
> > > > -- next() returns 66666
> > > > and
> > > > -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
> > > > -- next() returns 66666
> > > > Besides those, the input values are the same and the output is as
> > > expected.
> > > > Additionally, a coworker noted that the number of duplicates always
> > > matches
> > > > the number of times *Long.compare(cacheSegmentId, storeSegmentId)
> > > *returns
> > > > a non-zero value, indicating that duplicates are likely arising due
> to
> > > the
> > > > segment comparison.
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Re: Possible bug? Duplicates when searching kafka stream state store with caching

Posted by Damian Guy <da...@gmail.com>.
Hi,

When you create your window store do you have `retainDuplicates` set to
`true`? i.e., assuming you use `Stores.persistentWindowStore(...)` is the
last param `true`?

Thanks,
Damian

On Mon, 2 Jul 2018 at 17:29 Christian Henry <ch...@gmail.com>
wrote:

> We're using the latest Kafka (1.1.0). I'd like to note that when we
> encounter duplicates, the window is the same as well.
>
> My original code was a bit simplifier -- we also insert into the store if
> iterator.hasNext() as well, before returning null. We're using a window
> store because we have a punctuator that runs every few minutes to count
> GUIDs with similar metadata, and reports that in a healthcheck. Since our
> healthcheck window is less than the retention period of the store
> (retention period might be 1 hour, healthcheck window is ~5 min), the
> window store seemed like a good way to efficiently query all of the most
> recent data. Note that since the healthcheck punctuator needs to aggregate
> on all the recent values, it has to do a *fetchAll(start, end) *which is
> how these duplicates are affecting us.
>
> On Fri, Jun 29, 2018 at 7:32 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Christian,
> >
> > Since you are calling fetch(key, start, end) I'm assuming that
> > duplicateStore
> > is a WindowedStore. With a windowed store, it is possible that a single
> key
> > can fall into multiple windows, and hence be returned from the
> > WindowStoreIterator,
> > note its type is <Windowed<K>, V>
> >
> > So I'd first want to know
> >
> > 1) which Kafka version are you using.
> > 2) why you'd need a window store, and if yes, could you consider using
> the
> > single point fetch (added in KAFKA-6560) other than the range query
> (which
> > is more expensive as well).
> >
> >
> >
> > Guozhang
> >
> >
> > On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry <
> > christian.henry92@gmail.com> wrote:
> >
> > > Hi all,
> > >
> > > I'll first describe a simplified view of relevant parts of our setup
> > (which
> > > should be enough to repro), describe the behavior we're seeing, and
> then
> > > note some information I've come across after digging in a bit.
> > >
> > > We have a kafka stream application, and one of our transform steps
> keeps
> > a
> > > state store to filter out messages with a previously seen GUID. That
> is,
> > > our transform looks like:
> > >
> > > public KeyValue<byte[], String> transform(byte[] key, String guid) {
> > >     try (WindowStoreIterator<DuplicateMessageMetadata> iterator =
> > > duplicateStore.fetch(correlationId, start, now)) {
> > >         if (iterator.hasNext()) {
> > >             return null;
> > >         } else {
> > >             duplicateStore.put(correlationId, some metadata);
> > >             return new KeyValue<>(key, message);
> > >         }
> > >     }}
> > >
> > > where the duplicateStore is a persistent windowed store with caching
> > > enabled.
> > >
> > > I was debugging some tests and found that sometimes when calling
> > > *all()* or *fetchAll()
> > > *on the duplicate store and stepping through the iterator, it would
> > return
> > > the same guid more than once, even if it was only inserted into the
> store
> > > once. More specifically, if I had the following guids sent to the
> stream:
> > > [11111, 22222, ... 99999] (for 9 values total), sometimes it would
> return
> > > 10 values, with one (or more) of the values being returned twice by the
> > > iterator. However, this would not show up with a *fetch(guid)* on that
> > > specific guid. For instance, if 11111 was being returned twice by
> > > *fetchAll()*, calling *duplicateStore.fetch("11111", start, end)* will
> > > still return an iterator with size of 1.
> > >
> > > I dug into this a bit more by setting a breakpoint in
> > > *SegmentedCacheFunction#compareSegmentedKeys(cacheKey,
> > > storeKey)* and watching the two input values as I looped through the
> > > iterator using "*while(iterator.hasNext()) { print(iterator.next())
> }*".
> > In
> > > one test, the duplicate value was 66666, and saw the following behavior
> > > (trimming off the segment values from the byte input):
> > > -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
> > > -- next() returns 66666
> > > and
> > > -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
> > > -- next() returns 66666
> > > Besides those, the input values are the same and the output is as
> > expected.
> > > Additionally, a coworker noted that the number of duplicates always
> > matches
> > > the number of times *Long.compare(cacheSegmentId, storeSegmentId)
> > *returns
> > > a non-zero value, indicating that duplicates are likely arising due to
> > the
> > > segment comparison.
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: Possible bug? Duplicates when searching kafka stream state store with caching

Posted by Christian Henry <ch...@gmail.com>.
We're using the latest Kafka (1.1.0). I'd like to note that when we
encounter duplicates, the window is the same as well.

My original code was a bit simplifier -- we also insert into the store if
iterator.hasNext() as well, before returning null. We're using a window
store because we have a punctuator that runs every few minutes to count
GUIDs with similar metadata, and reports that in a healthcheck. Since our
healthcheck window is less than the retention period of the store
(retention period might be 1 hour, healthcheck window is ~5 min), the
window store seemed like a good way to efficiently query all of the most
recent data. Note that since the healthcheck punctuator needs to aggregate
on all the recent values, it has to do a *fetchAll(start, end) *which is
how these duplicates are affecting us.

On Fri, Jun 29, 2018 at 7:32 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello Christian,
>
> Since you are calling fetch(key, start, end) I'm assuming that
> duplicateStore
> is a WindowedStore. With a windowed store, it is possible that a single key
> can fall into multiple windows, and hence be returned from the
> WindowStoreIterator,
> note its type is <Windowed<K>, V>
>
> So I'd first want to know
>
> 1) which Kafka version are you using.
> 2) why you'd need a window store, and if yes, could you consider using the
> single point fetch (added in KAFKA-6560) other than the range query (which
> is more expensive as well).
>
>
>
> Guozhang
>
>
> On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry <
> christian.henry92@gmail.com> wrote:
>
> > Hi all,
> >
> > I'll first describe a simplified view of relevant parts of our setup
> (which
> > should be enough to repro), describe the behavior we're seeing, and then
> > note some information I've come across after digging in a bit.
> >
> > We have a kafka stream application, and one of our transform steps keeps
> a
> > state store to filter out messages with a previously seen GUID. That is,
> > our transform looks like:
> >
> > public KeyValue<byte[], String> transform(byte[] key, String guid) {
> >     try (WindowStoreIterator<DuplicateMessageMetadata> iterator =
> > duplicateStore.fetch(correlationId, start, now)) {
> >         if (iterator.hasNext()) {
> >             return null;
> >         } else {
> >             duplicateStore.put(correlationId, some metadata);
> >             return new KeyValue<>(key, message);
> >         }
> >     }}
> >
> > where the duplicateStore is a persistent windowed store with caching
> > enabled.
> >
> > I was debugging some tests and found that sometimes when calling
> > *all()* or *fetchAll()
> > *on the duplicate store and stepping through the iterator, it would
> return
> > the same guid more than once, even if it was only inserted into the store
> > once. More specifically, if I had the following guids sent to the stream:
> > [11111, 22222, ... 99999] (for 9 values total), sometimes it would return
> > 10 values, with one (or more) of the values being returned twice by the
> > iterator. However, this would not show up with a *fetch(guid)* on that
> > specific guid. For instance, if 11111 was being returned twice by
> > *fetchAll()*, calling *duplicateStore.fetch("11111", start, end)* will
> > still return an iterator with size of 1.
> >
> > I dug into this a bit more by setting a breakpoint in
> > *SegmentedCacheFunction#compareSegmentedKeys(cacheKey,
> > storeKey)* and watching the two input values as I looped through the
> > iterator using "*while(iterator.hasNext()) { print(iterator.next()) }*".
> In
> > one test, the duplicate value was 66666, and saw the following behavior
> > (trimming off the segment values from the byte input):
> > -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
> > -- next() returns 66666
> > and
> > -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
> > -- next() returns 66666
> > Besides those, the input values are the same and the output is as
> expected.
> > Additionally, a coworker noted that the number of duplicates always
> matches
> > the number of times *Long.compare(cacheSegmentId, storeSegmentId)
> *returns
> > a non-zero value, indicating that duplicates are likely arising due to
> the
> > segment comparison.
> >
>
>
>
> --
> -- Guozhang
>

Re: Possible bug? Duplicates when searching kafka stream state store with caching

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Christian,

Since you are calling fetch(key, start, end) I'm assuming that duplicateStore
is a WindowedStore. With a windowed store, it is possible that a single key
can fall into multiple windows, and hence be returned from the
WindowStoreIterator,
note its type is <Windowed<K>, V>

So I'd first want to know

1) which Kafka version are you using.
2) why you'd need a window store, and if yes, could you consider using the
single point fetch (added in KAFKA-6560) other than the range query (which
is more expensive as well).



Guozhang


On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry <
christian.henry92@gmail.com> wrote:

> Hi all,
>
> I'll first describe a simplified view of relevant parts of our setup (which
> should be enough to repro), describe the behavior we're seeing, and then
> note some information I've come across after digging in a bit.
>
> We have a kafka stream application, and one of our transform steps keeps a
> state store to filter out messages with a previously seen GUID. That is,
> our transform looks like:
>
> public KeyValue<byte[], String> transform(byte[] key, String guid) {
>     try (WindowStoreIterator<DuplicateMessageMetadata> iterator =
> duplicateStore.fetch(correlationId, start, now)) {
>         if (iterator.hasNext()) {
>             return null;
>         } else {
>             duplicateStore.put(correlationId, some metadata);
>             return new KeyValue<>(key, message);
>         }
>     }}
>
> where the duplicateStore is a persistent windowed store with caching
> enabled.
>
> I was debugging some tests and found that sometimes when calling
> *all()* or *fetchAll()
> *on the duplicate store and stepping through the iterator, it would return
> the same guid more than once, even if it was only inserted into the store
> once. More specifically, if I had the following guids sent to the stream:
> [11111, 22222, ... 99999] (for 9 values total), sometimes it would return
> 10 values, with one (or more) of the values being returned twice by the
> iterator. However, this would not show up with a *fetch(guid)* on that
> specific guid. For instance, if 11111 was being returned twice by
> *fetchAll()*, calling *duplicateStore.fetch("11111", start, end)* will
> still return an iterator with size of 1.
>
> I dug into this a bit more by setting a breakpoint in
> *SegmentedCacheFunction#compareSegmentedKeys(cacheKey,
> storeKey)* and watching the two input values as I looped through the
> iterator using "*while(iterator.hasNext()) { print(iterator.next()) }*". In
> one test, the duplicate value was 66666, and saw the following behavior
> (trimming off the segment values from the byte input):
> -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
> -- next() returns 66666
> and
> -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
> -- next() returns 66666
> Besides those, the input values are the same and the output is as expected.
> Additionally, a coworker noted that the number of duplicates always matches
> the number of times *Long.compare(cacheSegmentId, storeSegmentId) *returns
> a non-zero value, indicating that duplicates are likely arising due to the
> segment comparison.
>



-- 
-- Guozhang