You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Christian Henry <ch...@gmail.com> on 2018/07/02 16:29:12 UTC

Re: Possible bug? Duplicates when searching kafka stream state store with caching

We're using the latest Kafka (1.1.0). I'd like to note that when we
encounter duplicates, the window is the same as well.

My original code was a bit simplifier -- we also insert into the store if
iterator.hasNext() as well, before returning null. We're using a window
store because we have a punctuator that runs every few minutes to count
GUIDs with similar metadata, and reports that in a healthcheck. Since our
healthcheck window is less than the retention period of the store
(retention period might be 1 hour, healthcheck window is ~5 min), the
window store seemed like a good way to efficiently query all of the most
recent data. Note that since the healthcheck punctuator needs to aggregate
on all the recent values, it has to do a *fetchAll(start, end) *which is
how these duplicates are affecting us.

On Fri, Jun 29, 2018 at 7:32 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello Christian,
>
> Since you are calling fetch(key, start, end) I'm assuming that
> duplicateStore
> is a WindowedStore. With a windowed store, it is possible that a single key
> can fall into multiple windows, and hence be returned from the
> WindowStoreIterator,
> note its type is <Windowed<K>, V>
>
> So I'd first want to know
>
> 1) which Kafka version are you using.
> 2) why you'd need a window store, and if yes, could you consider using the
> single point fetch (added in KAFKA-6560) other than the range query (which
> is more expensive as well).
>
>
>
> Guozhang
>
>
> On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry <
> christian.henry92@gmail.com> wrote:
>
> > Hi all,
> >
> > I'll first describe a simplified view of relevant parts of our setup
> (which
> > should be enough to repro), describe the behavior we're seeing, and then
> > note some information I've come across after digging in a bit.
> >
> > We have a kafka stream application, and one of our transform steps keeps
> a
> > state store to filter out messages with a previously seen GUID. That is,
> > our transform looks like:
> >
> > public KeyValue<byte[], String> transform(byte[] key, String guid) {
> >     try (WindowStoreIterator<DuplicateMessageMetadata> iterator =
> > duplicateStore.fetch(correlationId, start, now)) {
> >         if (iterator.hasNext()) {
> >             return null;
> >         } else {
> >             duplicateStore.put(correlationId, some metadata);
> >             return new KeyValue<>(key, message);
> >         }
> >     }}
> >
> > where the duplicateStore is a persistent windowed store with caching
> > enabled.
> >
> > I was debugging some tests and found that sometimes when calling
> > *all()* or *fetchAll()
> > *on the duplicate store and stepping through the iterator, it would
> return
> > the same guid more than once, even if it was only inserted into the store
> > once. More specifically, if I had the following guids sent to the stream:
> > [11111, 22222, ... 99999] (for 9 values total), sometimes it would return
> > 10 values, with one (or more) of the values being returned twice by the
> > iterator. However, this would not show up with a *fetch(guid)* on that
> > specific guid. For instance, if 11111 was being returned twice by
> > *fetchAll()*, calling *duplicateStore.fetch("11111", start, end)* will
> > still return an iterator with size of 1.
> >
> > I dug into this a bit more by setting a breakpoint in
> > *SegmentedCacheFunction#compareSegmentedKeys(cacheKey,
> > storeKey)* and watching the two input values as I looped through the
> > iterator using "*while(iterator.hasNext()) { print(iterator.next()) }*".
> In
> > one test, the duplicate value was 66666, and saw the following behavior
> > (trimming off the segment values from the byte input):
> > -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
> > -- next() returns 66666
> > and
> > -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
> > -- next() returns 66666
> > Besides those, the input values are the same and the output is as
> expected.
> > Additionally, a coworker noted that the number of duplicates always
> matches
> > the number of times *Long.compare(cacheSegmentId, storeSegmentId)
> *returns
> > a non-zero value, indicating that duplicates are likely arising due to
> the
> > segment comparison.
> >
>
>
>
> --
> -- Guozhang
>

Re: Possible bug? Duplicates when searching kafka stream state store with caching

Posted by Guozhang Wang <wa...@gmail.com>.
Yes, please create a JIRA reporting this: the `all()` and `fetchAll()`
source code was not modified since its first added into ReadOnlyWindowStore
API, so it's likely a lurking bug caused the issue. And please attach your
code / sample data if possible to help us reproduce this issue in order to
investigate further.



Guozhang


On Fri, Jul 6, 2018 at 7:41 AM, Christian Henry <christian.henry92@gmail.com
> wrote:

> Any other ideas here? Should I create a bug?
>
> On Tue, Jul 3, 2018 at 1:21 PM, Christian Henry <
> christian.henry92@gmail.com
> > wrote:
>
> > Nope, we're setting retainDuplicates to false.
> >
> > On Tue, Jul 3, 2018 at 6:55 AM, Damian Guy <da...@gmail.com> wrote:
> >
> >> Hi,
> >>
> >> When you create your window store do you have `retainDuplicates` set to
> >> `true`? i.e., assuming you use `Stores.persistentWindowStore(...)` is
> the
> >> last param `true`?
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Mon, 2 Jul 2018 at 17:29 Christian Henry <
> christian.henry92@gmail.com>
> >> wrote:
> >>
> >> > We're using the latest Kafka (1.1.0). I'd like to note that when we
> >> > encounter duplicates, the window is the same as well.
> >> >
> >> > My original code was a bit simplifier -- we also insert into the store
> >> if
> >> > iterator.hasNext() as well, before returning null. We're using a
> window
> >> > store because we have a punctuator that runs every few minutes to
> count
> >> > GUIDs with similar metadata, and reports that in a healthcheck. Since
> >> our
> >> > healthcheck window is less than the retention period of the store
> >> > (retention period might be 1 hour, healthcheck window is ~5 min), the
> >> > window store seemed like a good way to efficiently query all of the
> most
> >> > recent data. Note that since the healthcheck punctuator needs to
> >> aggregate
> >> > on all the recent values, it has to do a *fetchAll(start, end) *which
> is
> >> > how these duplicates are affecting us.
> >> >
> >> > On Fri, Jun 29, 2018 at 7:32 PM, Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >> >
> >> > > Hello Christian,
> >> > >
> >> > > Since you are calling fetch(key, start, end) I'm assuming that
> >> > > duplicateStore
> >> > > is a WindowedStore. With a windowed store, it is possible that a
> >> single
> >> > key
> >> > > can fall into multiple windows, and hence be returned from the
> >> > > WindowStoreIterator,
> >> > > note its type is <Windowed<K>, V>
> >> > >
> >> > > So I'd first want to know
> >> > >
> >> > > 1) which Kafka version are you using.
> >> > > 2) why you'd need a window store, and if yes, could you consider
> using
> >> > the
> >> > > single point fetch (added in KAFKA-6560) other than the range query
> >> > (which
> >> > > is more expensive as well).
> >> > >
> >> > >
> >> > >
> >> > > Guozhang
> >> > >
> >> > >
> >> > > On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry <
> >> > > christian.henry92@gmail.com> wrote:
> >> > >
> >> > > > Hi all,
> >> > > >
> >> > > > I'll first describe a simplified view of relevant parts of our
> setup
> >> > > (which
> >> > > > should be enough to repro), describe the behavior we're seeing,
> and
> >> > then
> >> > > > note some information I've come across after digging in a bit.
> >> > > >
> >> > > > We have a kafka stream application, and one of our transform steps
> >> > keeps
> >> > > a
> >> > > > state store to filter out messages with a previously seen GUID.
> That
> >> > is,
> >> > > > our transform looks like:
> >> > > >
> >> > > > public KeyValue<byte[], String> transform(byte[] key, String
> guid) {
> >> > > >     try (WindowStoreIterator<DuplicateMessageMetadata> iterator =
> >> > > > duplicateStore.fetch(correlationId, start, now)) {
> >> > > >         if (iterator.hasNext()) {
> >> > > >             return null;
> >> > > >         } else {
> >> > > >             duplicateStore.put(correlationId, some metadata);
> >> > > >             return new KeyValue<>(key, message);
> >> > > >         }
> >> > > >     }}
> >> > > >
> >> > > > where the duplicateStore is a persistent windowed store with
> caching
> >> > > > enabled.
> >> > > >
> >> > > > I was debugging some tests and found that sometimes when calling
> >> > > > *all()* or *fetchAll()
> >> > > > *on the duplicate store and stepping through the iterator, it
> would
> >> > > return
> >> > > > the same guid more than once, even if it was only inserted into
> the
> >> > store
> >> > > > once. More specifically, if I had the following guids sent to the
> >> > stream:
> >> > > > [11111, 22222, ... 99999] (for 9 values total), sometimes it would
> >> > return
> >> > > > 10 values, with one (or more) of the values being returned twice
> by
> >> the
> >> > > > iterator. However, this would not show up with a *fetch(guid)* on
> >> that
> >> > > > specific guid. For instance, if 11111 was being returned twice by
> >> > > > *fetchAll()*, calling *duplicateStore.fetch("11111", start, end)*
> >> will
> >> > > > still return an iterator with size of 1.
> >> > > >
> >> > > > I dug into this a bit more by setting a breakpoint in
> >> > > > *SegmentedCacheFunction#compareSegmentedKeys(cacheKey,
> >> > > > storeKey)* and watching the two input values as I looped through
> the
> >> > > > iterator using "*while(iterator.hasNext()) {
> print(iterator.next())
> >> > }*".
> >> > > In
> >> > > > one test, the duplicate value was 66666, and saw the following
> >> behavior
> >> > > > (trimming off the segment values from the byte input):
> >> > > > -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
> >> > > > -- next() returns 66666
> >> > > > and
> >> > > > -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
> >> > > > -- next() returns 66666
> >> > > > Besides those, the input values are the same and the output is as
> >> > > expected.
> >> > > > Additionally, a coworker noted that the number of duplicates
> always
> >> > > matches
> >> > > > the number of times *Long.compare(cacheSegmentId, storeSegmentId)
> >> > > *returns
> >> > > > a non-zero value, indicating that duplicates are likely arising
> due
> >> to
> >> > > the
> >> > > > segment comparison.
> >> > > >
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >>
> >
> >
>



-- 
-- Guozhang

Re: Possible bug? Duplicates when searching kafka stream state store with caching

Posted by Christian Henry <ch...@gmail.com>.
Any other ideas here? Should I create a bug?

On Tue, Jul 3, 2018 at 1:21 PM, Christian Henry <christian.henry92@gmail.com
> wrote:

> Nope, we're setting retainDuplicates to false.
>
> On Tue, Jul 3, 2018 at 6:55 AM, Damian Guy <da...@gmail.com> wrote:
>
>> Hi,
>>
>> When you create your window store do you have `retainDuplicates` set to
>> `true`? i.e., assuming you use `Stores.persistentWindowStore(...)` is the
>> last param `true`?
>>
>> Thanks,
>> Damian
>>
>> On Mon, 2 Jul 2018 at 17:29 Christian Henry <ch...@gmail.com>
>> wrote:
>>
>> > We're using the latest Kafka (1.1.0). I'd like to note that when we
>> > encounter duplicates, the window is the same as well.
>> >
>> > My original code was a bit simplifier -- we also insert into the store
>> if
>> > iterator.hasNext() as well, before returning null. We're using a window
>> > store because we have a punctuator that runs every few minutes to count
>> > GUIDs with similar metadata, and reports that in a healthcheck. Since
>> our
>> > healthcheck window is less than the retention period of the store
>> > (retention period might be 1 hour, healthcheck window is ~5 min), the
>> > window store seemed like a good way to efficiently query all of the most
>> > recent data. Note that since the healthcheck punctuator needs to
>> aggregate
>> > on all the recent values, it has to do a *fetchAll(start, end) *which is
>> > how these duplicates are affecting us.
>> >
>> > On Fri, Jun 29, 2018 at 7:32 PM, Guozhang Wang <wa...@gmail.com>
>> wrote:
>> >
>> > > Hello Christian,
>> > >
>> > > Since you are calling fetch(key, start, end) I'm assuming that
>> > > duplicateStore
>> > > is a WindowedStore. With a windowed store, it is possible that a
>> single
>> > key
>> > > can fall into multiple windows, and hence be returned from the
>> > > WindowStoreIterator,
>> > > note its type is <Windowed<K>, V>
>> > >
>> > > So I'd first want to know
>> > >
>> > > 1) which Kafka version are you using.
>> > > 2) why you'd need a window store, and if yes, could you consider using
>> > the
>> > > single point fetch (added in KAFKA-6560) other than the range query
>> > (which
>> > > is more expensive as well).
>> > >
>> > >
>> > >
>> > > Guozhang
>> > >
>> > >
>> > > On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry <
>> > > christian.henry92@gmail.com> wrote:
>> > >
>> > > > Hi all,
>> > > >
>> > > > I'll first describe a simplified view of relevant parts of our setup
>> > > (which
>> > > > should be enough to repro), describe the behavior we're seeing, and
>> > then
>> > > > note some information I've come across after digging in a bit.
>> > > >
>> > > > We have a kafka stream application, and one of our transform steps
>> > keeps
>> > > a
>> > > > state store to filter out messages with a previously seen GUID. That
>> > is,
>> > > > our transform looks like:
>> > > >
>> > > > public KeyValue<byte[], String> transform(byte[] key, String guid) {
>> > > >     try (WindowStoreIterator<DuplicateMessageMetadata> iterator =
>> > > > duplicateStore.fetch(correlationId, start, now)) {
>> > > >         if (iterator.hasNext()) {
>> > > >             return null;
>> > > >         } else {
>> > > >             duplicateStore.put(correlationId, some metadata);
>> > > >             return new KeyValue<>(key, message);
>> > > >         }
>> > > >     }}
>> > > >
>> > > > where the duplicateStore is a persistent windowed store with caching
>> > > > enabled.
>> > > >
>> > > > I was debugging some tests and found that sometimes when calling
>> > > > *all()* or *fetchAll()
>> > > > *on the duplicate store and stepping through the iterator, it would
>> > > return
>> > > > the same guid more than once, even if it was only inserted into the
>> > store
>> > > > once. More specifically, if I had the following guids sent to the
>> > stream:
>> > > > [11111, 22222, ... 99999] (for 9 values total), sometimes it would
>> > return
>> > > > 10 values, with one (or more) of the values being returned twice by
>> the
>> > > > iterator. However, this would not show up with a *fetch(guid)* on
>> that
>> > > > specific guid. For instance, if 11111 was being returned twice by
>> > > > *fetchAll()*, calling *duplicateStore.fetch("11111", start, end)*
>> will
>> > > > still return an iterator with size of 1.
>> > > >
>> > > > I dug into this a bit more by setting a breakpoint in
>> > > > *SegmentedCacheFunction#compareSegmentedKeys(cacheKey,
>> > > > storeKey)* and watching the two input values as I looped through the
>> > > > iterator using "*while(iterator.hasNext()) { print(iterator.next())
>> > }*".
>> > > In
>> > > > one test, the duplicate value was 66666, and saw the following
>> behavior
>> > > > (trimming off the segment values from the byte input):
>> > > > -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
>> > > > -- next() returns 66666
>> > > > and
>> > > > -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
>> > > > -- next() returns 66666
>> > > > Besides those, the input values are the same and the output is as
>> > > expected.
>> > > > Additionally, a coworker noted that the number of duplicates always
>> > > matches
>> > > > the number of times *Long.compare(cacheSegmentId, storeSegmentId)
>> > > *returns
>> > > > a non-zero value, indicating that duplicates are likely arising due
>> to
>> > > the
>> > > > segment comparison.
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>
>

Re: Possible bug? Duplicates when searching kafka stream state store with caching

Posted by Christian Henry <ch...@gmail.com>.
Nope, we're setting retainDuplicates to false.

On Tue, Jul 3, 2018 at 6:55 AM, Damian Guy <da...@gmail.com> wrote:

> Hi,
>
> When you create your window store do you have `retainDuplicates` set to
> `true`? i.e., assuming you use `Stores.persistentWindowStore(...)` is the
> last param `true`?
>
> Thanks,
> Damian
>
> On Mon, 2 Jul 2018 at 17:29 Christian Henry <ch...@gmail.com>
> wrote:
>
> > We're using the latest Kafka (1.1.0). I'd like to note that when we
> > encounter duplicates, the window is the same as well.
> >
> > My original code was a bit simplifier -- we also insert into the store if
> > iterator.hasNext() as well, before returning null. We're using a window
> > store because we have a punctuator that runs every few minutes to count
> > GUIDs with similar metadata, and reports that in a healthcheck. Since our
> > healthcheck window is less than the retention period of the store
> > (retention period might be 1 hour, healthcheck window is ~5 min), the
> > window store seemed like a good way to efficiently query all of the most
> > recent data. Note that since the healthcheck punctuator needs to
> aggregate
> > on all the recent values, it has to do a *fetchAll(start, end) *which is
> > how these duplicates are affecting us.
> >
> > On Fri, Jun 29, 2018 at 7:32 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hello Christian,
> > >
> > > Since you are calling fetch(key, start, end) I'm assuming that
> > > duplicateStore
> > > is a WindowedStore. With a windowed store, it is possible that a single
> > key
> > > can fall into multiple windows, and hence be returned from the
> > > WindowStoreIterator,
> > > note its type is <Windowed<K>, V>
> > >
> > > So I'd first want to know
> > >
> > > 1) which Kafka version are you using.
> > > 2) why you'd need a window store, and if yes, could you consider using
> > the
> > > single point fetch (added in KAFKA-6560) other than the range query
> > (which
> > > is more expensive as well).
> > >
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry <
> > > christian.henry92@gmail.com> wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'll first describe a simplified view of relevant parts of our setup
> > > (which
> > > > should be enough to repro), describe the behavior we're seeing, and
> > then
> > > > note some information I've come across after digging in a bit.
> > > >
> > > > We have a kafka stream application, and one of our transform steps
> > keeps
> > > a
> > > > state store to filter out messages with a previously seen GUID. That
> > is,
> > > > our transform looks like:
> > > >
> > > > public KeyValue<byte[], String> transform(byte[] key, String guid) {
> > > >     try (WindowStoreIterator<DuplicateMessageMetadata> iterator =
> > > > duplicateStore.fetch(correlationId, start, now)) {
> > > >         if (iterator.hasNext()) {
> > > >             return null;
> > > >         } else {
> > > >             duplicateStore.put(correlationId, some metadata);
> > > >             return new KeyValue<>(key, message);
> > > >         }
> > > >     }}
> > > >
> > > > where the duplicateStore is a persistent windowed store with caching
> > > > enabled.
> > > >
> > > > I was debugging some tests and found that sometimes when calling
> > > > *all()* or *fetchAll()
> > > > *on the duplicate store and stepping through the iterator, it would
> > > return
> > > > the same guid more than once, even if it was only inserted into the
> > store
> > > > once. More specifically, if I had the following guids sent to the
> > stream:
> > > > [11111, 22222, ... 99999] (for 9 values total), sometimes it would
> > return
> > > > 10 values, with one (or more) of the values being returned twice by
> the
> > > > iterator. However, this would not show up with a *fetch(guid)* on
> that
> > > > specific guid. For instance, if 11111 was being returned twice by
> > > > *fetchAll()*, calling *duplicateStore.fetch("11111", start, end)*
> will
> > > > still return an iterator with size of 1.
> > > >
> > > > I dug into this a bit more by setting a breakpoint in
> > > > *SegmentedCacheFunction#compareSegmentedKeys(cacheKey,
> > > > storeKey)* and watching the two input values as I looped through the
> > > > iterator using "*while(iterator.hasNext()) { print(iterator.next())
> > }*".
> > > In
> > > > one test, the duplicate value was 66666, and saw the following
> behavior
> > > > (trimming off the segment values from the byte input):
> > > > -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
> > > > -- next() returns 66666
> > > > and
> > > > -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
> > > > -- next() returns 66666
> > > > Besides those, the input values are the same and the output is as
> > > expected.
> > > > Additionally, a coworker noted that the number of duplicates always
> > > matches
> > > > the number of times *Long.compare(cacheSegmentId, storeSegmentId)
> > > *returns
> > > > a non-zero value, indicating that duplicates are likely arising due
> to
> > > the
> > > > segment comparison.
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Re: Possible bug? Duplicates when searching kafka stream state store with caching

Posted by Damian Guy <da...@gmail.com>.
Hi,

When you create your window store do you have `retainDuplicates` set to
`true`? i.e., assuming you use `Stores.persistentWindowStore(...)` is the
last param `true`?

Thanks,
Damian

On Mon, 2 Jul 2018 at 17:29 Christian Henry <ch...@gmail.com>
wrote:

> We're using the latest Kafka (1.1.0). I'd like to note that when we
> encounter duplicates, the window is the same as well.
>
> My original code was a bit simplifier -- we also insert into the store if
> iterator.hasNext() as well, before returning null. We're using a window
> store because we have a punctuator that runs every few minutes to count
> GUIDs with similar metadata, and reports that in a healthcheck. Since our
> healthcheck window is less than the retention period of the store
> (retention period might be 1 hour, healthcheck window is ~5 min), the
> window store seemed like a good way to efficiently query all of the most
> recent data. Note that since the healthcheck punctuator needs to aggregate
> on all the recent values, it has to do a *fetchAll(start, end) *which is
> how these duplicates are affecting us.
>
> On Fri, Jun 29, 2018 at 7:32 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Christian,
> >
> > Since you are calling fetch(key, start, end) I'm assuming that
> > duplicateStore
> > is a WindowedStore. With a windowed store, it is possible that a single
> key
> > can fall into multiple windows, and hence be returned from the
> > WindowStoreIterator,
> > note its type is <Windowed<K>, V>
> >
> > So I'd first want to know
> >
> > 1) which Kafka version are you using.
> > 2) why you'd need a window store, and if yes, could you consider using
> the
> > single point fetch (added in KAFKA-6560) other than the range query
> (which
> > is more expensive as well).
> >
> >
> >
> > Guozhang
> >
> >
> > On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry <
> > christian.henry92@gmail.com> wrote:
> >
> > > Hi all,
> > >
> > > I'll first describe a simplified view of relevant parts of our setup
> > (which
> > > should be enough to repro), describe the behavior we're seeing, and
> then
> > > note some information I've come across after digging in a bit.
> > >
> > > We have a kafka stream application, and one of our transform steps
> keeps
> > a
> > > state store to filter out messages with a previously seen GUID. That
> is,
> > > our transform looks like:
> > >
> > > public KeyValue<byte[], String> transform(byte[] key, String guid) {
> > >     try (WindowStoreIterator<DuplicateMessageMetadata> iterator =
> > > duplicateStore.fetch(correlationId, start, now)) {
> > >         if (iterator.hasNext()) {
> > >             return null;
> > >         } else {
> > >             duplicateStore.put(correlationId, some metadata);
> > >             return new KeyValue<>(key, message);
> > >         }
> > >     }}
> > >
> > > where the duplicateStore is a persistent windowed store with caching
> > > enabled.
> > >
> > > I was debugging some tests and found that sometimes when calling
> > > *all()* or *fetchAll()
> > > *on the duplicate store and stepping through the iterator, it would
> > return
> > > the same guid more than once, even if it was only inserted into the
> store
> > > once. More specifically, if I had the following guids sent to the
> stream:
> > > [11111, 22222, ... 99999] (for 9 values total), sometimes it would
> return
> > > 10 values, with one (or more) of the values being returned twice by the
> > > iterator. However, this would not show up with a *fetch(guid)* on that
> > > specific guid. For instance, if 11111 was being returned twice by
> > > *fetchAll()*, calling *duplicateStore.fetch("11111", start, end)* will
> > > still return an iterator with size of 1.
> > >
> > > I dug into this a bit more by setting a breakpoint in
> > > *SegmentedCacheFunction#compareSegmentedKeys(cacheKey,
> > > storeKey)* and watching the two input values as I looped through the
> > > iterator using "*while(iterator.hasNext()) { print(iterator.next())
> }*".
> > In
> > > one test, the duplicate value was 66666, and saw the following behavior
> > > (trimming off the segment values from the byte input):
> > > -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
> > > -- next() returns 66666
> > > and
> > > -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
> > > -- next() returns 66666
> > > Besides those, the input values are the same and the output is as
> > expected.
> > > Additionally, a coworker noted that the number of duplicates always
> > matches
> > > the number of times *Long.compare(cacheSegmentId, storeSegmentId)
> > *returns
> > > a non-zero value, indicating that duplicates are likely arising due to
> > the
> > > segment comparison.
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>