You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Dmitry Minkovsky <dm...@gmail.com> on 2017/03/13 22:56:04 UTC

Kafka Streams: ReadOnlyKeyValueStore range behavior

I am using interactive streams to query tables:

            ReadOnlyKeyValueStore<Messages.ByUserAndDate,
Messages.UserLetter> store
              = streams.store("view-user-drafts",
QueryableStoreTypes.keyValueStore());

Documentation says that #range() should not return null values. However,
for keys that have been tombstoned, it does return null for me.

Also, I noticed only just now that "No ordering guarantees are provided." I
haven't done enough testing or looked at the code carefully enough yet and
wonder if someone who knows could confirm: is this true? Is this common to
all store implementations? I was hoping to use interactive streams like
HBase to scan ranges. It appears this is not possible.

Thank you,
Dmitry

Re: Kafka Streams: ReadOnlyKeyValueStore range behavior

Posted by Damian Guy <da...@gmail.com>.
Thanks Dmitry. Please do create a JIRA for the range scan.
On Fri, 17 Mar 2017 at 18:01, Dmitry Minkovsky <dm...@gmail.com> wrote:

> Regarding the null bug: I had time to open a JIRA today. Looks like an
> issue already exists: https://issues.apache.org/jira/browse/KAFKA-4750
>
> Regarding scan order: I would gladly produce a sample that replicates this
> behavior if you can confirm that you will perceive this as a defect. I
> would really love to be able to do ordered prefixed range scans with
> interactive queries. But if you don't think the lack of this facility is a
> defect then I can't spend more time on this.
>
> Thank you!
>
> On Fri, Mar 17, 2017 at 1:18 PM, Dmitry Minkovsky <dm...@gmail.com>
> wrote:
>
> > Ah! Yes. Thank you! That make sense.
> >
> > Anyway, I _think_ that's not what I was doing given that all items were
> > being routed to and then read from a partition identified by one key.
> >
> > On Fri, Mar 17, 2017 at 12:50 PM, Damian Guy <da...@gmail.com>
> wrote:
> >
> >> > When you use Queryable State you are actually querying multiple
> >>
> >> > underlying stores, i.e., one per partition.
> >> >
> >> > Huh? I was only querying one partition. In my example, I have a user's
> >> > posts. Upon creation, they are routed to a particular partition using
> a
> >> > partitioner that hashes the post's user ID. The posts are then indexed
> >> on
> >> > that partition by prefixed keys using the method described above. When
> >> > querying, I am only querying the one partition that has all of the
> >> user's
> >> > posts. As far as I know, I am not querying across multiple partitions.
> >> > Furthermore, I did not even think this was possible, given the fact
> that
> >> > Interactive Queries require you to manually forward requests that
> >> should go
> >> > to other partitions.
> >> >
> >> >
> >> Each KafkaStreams instance is potentially responsible for multiple
> >> partitions, so when you use Queryable State on a particular instance you
> >> are querying all partitions for that store on the given instance.
> >>
> >>
> >>
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > On Thu, Mar 16, 2017 at 2:11 PM, Damian Guy <da...@gmail.com>
> >> wrote:
> >> >
> >> > > I think what you are seeing is that the order is not guaranteed
> across
> >> > > partitions. When you use Queryable State you are actually querying
> >> > multiple
> >> > > underlying stores, i.e., one per partition. The implementation
> >> iterates
> >> > > over one store/partition at a time, so the ordering will appear
> >> random.
> >> > > This could be improved
> >> > >
> >> > > The tombstone records appearing in the results seems like a bug.
> >> > >
> >> > > Thanks,
> >> > > Damian
> >> > >
> >> > > On Thu, 16 Mar 2017 at 17:37 Matthias J. Sax <matthias@confluent.io
> >
> >> > > wrote:
> >> > >
> >> > > > Can you check if the problem exist for 0.10.2, too? (0.10.2 is
> >> > > > compatible to 0.10.1 broker -- so you can upgrade your Streams
> code
> >> > > > independently from the brokers).
> >> > > >
> >> > > > About the range: I did double check this, and I guess my last
> answer
> >> > was
> >> > > > not correct, and range() should return ordered data, but I got a
> >> follow
> >> > > > up question: what the key type and serializer you use? Internally,
> >> data
> >> > > > is stored in serialized form and ordered according to
> >> > > > `LexicographicByteArrayComparator` -- thus, if the serialized
> bytes
> >> > > > don't reflect the order of the deserialized data, it returned
> range
> >> > > > shows up unordered to you.
> >> > > >
> >> > > >
> >> > > > -Matthias
> >> > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > > On 3/16/17 10:14 AM, Dmitry Minkovsky wrote:
> >> > > > > Hi Matthias. Thank you for your response.
> >> > > > >
> >> > > > > Yes, I was able to reproduce the null issue reliably. I can't
> >> open a
> >> > > JIRA
> >> > > > > at this time, but I can say I was using 0.10.1.0 and it was
> >> trivial
> >> > to
> >> > > > > reproduce. Just send records and the tombstones to a table
> topic.
> >> > Then
> >> > > > scan
> >> > > > > the range. You'll see the tombstones.
> >> > > > >
> >> > > > > Indeed, ranges are returned with no specific order. I'm not sure
> >> what
> >> > > you
> >> > > > > mean that default stores are hash-based, but this ordering thing
> >> is a
> >> > > > shame
> >> > > > > because it kind of kills the ability to use KS as a full fledged
> >> DB
> >> > > that
> >> > > > > lets you index things like HBase (composite keys for lists of
> >> items).
> >> > > Is
> >> > > > > that how RocksDB works? Just returns range scans in random
> order?
> >> I
> >> > > don't
> >> > > > > know C++ so the documentation is a bit opaque to me. But what's
> >> the
> >> > > point
> >> > > > > of scanning a range if the data comes in some random order? That
> >> > being
> >> > > > the
> >> > > > > case, the number of possible use-case scenarios seem to become
> >> > > > > significantly limited.
> >> > > > >
> >> > > > >
> >> > > > > Thank you!
> >> > > > > Dmitry
> >> > > > >
> >> > > > > On Tue, Mar 14, 2017 at 1:12 PM, Matthias J. Sax <
> >> > > matthias@confluent.io>
> >> > > > > wrote:
> >> > > > >
> >> > > > >>> However,
> >> > > > >>>> for keys that have been tombstoned, it does return null for
> me.
> >> > > > >>
> >> > > > >> Sound like a bug. Can you reliable reproduce this? Would you
> mind
> >> > > > >> opening a JIRA?
> >> > > > >>
> >> > > > >> Can you check if this happens for both cases: caching enabled
> and
> >> > > > >> disabled? Or only for once case?
> >> > > > >>
> >> > > > >>
> >> > > > >>> "No ordering guarantees are provided."
> >> > > > >>
> >> > > > >> That is correct. Internally, default stores are hash-based --
> >> thus,
> >> > we
> >> > > > >> don't give a sorted list/iterator back. You could replace
> RocksDB
> >> > > with a
> >> > > > >> custom store though.
> >> > > > >>
> >> > > > >>
> >> > > > >> -Matthias
> >> > > > >>
> >> > > > >>
> >> > > > >> On 3/13/17 3:56 PM, Dmitry Minkovsky wrote:
> >> > > > >>> I am using interactive streams to query tables:
> >> > > > >>>
> >> > > > >>>             ReadOnlyKeyValueStore<Messages.ByUserAndDate,
> >> > > > >>> Messages.UserLetter> store
> >> > > > >>>               = streams.store("view-user-drafts",
> >> > > > >>> QueryableStoreTypes.keyValueStore());
> >> > > > >>>
> >> > > > >>> Documentation says that #range() should not return null
> values.
> >> > > > However,
> >> > > > >>> for keys that have been tombstoned, it does return null for
> me.
> >> > > > >>>
> >> > > > >>> Also, I noticed only just now that "No ordering guarantees are
> >> > > > >> provided." I
> >> > > > >>> haven't done enough testing or looked at the code carefully
> >> enough
> >> > > yet
> >> > > > >> and
> >> > > > >>> wonder if someone who knows could confirm: is this true? Is
> this
> >> > > common
> >> > > > >> to
> >> > > > >>> all store implementations? I was hoping to use interactive
> >> streams
> >> > > like
> >> > > > >>> HBase to scan ranges. It appears this is not possible.
> >> > > > >>>
> >> > > > >>> Thank you,
> >> > > > >>> Dmitry
> >> > > > >>>
> >> > > > >>
> >> > > > >>
> >> > > > >
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Re: Kafka Streams: ReadOnlyKeyValueStore range behavior

Posted by Dmitry Minkovsky <dm...@gmail.com>.
Regarding the null bug: I had time to open a JIRA today. Looks like an
issue already exists: https://issues.apache.org/jira/browse/KAFKA-4750

Regarding scan order: I would gladly produce a sample that replicates this
behavior if you can confirm that you will perceive this as a defect. I
would really love to be able to do ordered prefixed range scans with
interactive queries. But if you don't think the lack of this facility is a
defect then I can't spend more time on this.

Thank you!

On Fri, Mar 17, 2017 at 1:18 PM, Dmitry Minkovsky <dm...@gmail.com>
wrote:

> Ah! Yes. Thank you! That make sense.
>
> Anyway, I _think_ that's not what I was doing given that all items were
> being routed to and then read from a partition identified by one key.
>
> On Fri, Mar 17, 2017 at 12:50 PM, Damian Guy <da...@gmail.com> wrote:
>
>> > When you use Queryable State you are actually querying multiple
>>
>> > underlying stores, i.e., one per partition.
>> >
>> > Huh? I was only querying one partition. In my example, I have a user's
>> > posts. Upon creation, they are routed to a particular partition using a
>> > partitioner that hashes the post's user ID. The posts are then indexed
>> on
>> > that partition by prefixed keys using the method described above. When
>> > querying, I am only querying the one partition that has all of the
>> user's
>> > posts. As far as I know, I am not querying across multiple partitions.
>> > Furthermore, I did not even think this was possible, given the fact that
>> > Interactive Queries require you to manually forward requests that
>> should go
>> > to other partitions.
>> >
>> >
>> Each KafkaStreams instance is potentially responsible for multiple
>> partitions, so when you use Queryable State on a particular instance you
>> are querying all partitions for that store on the given instance.
>>
>>
>>
>> >
>> >
>> >
>> >
>> >
>> > On Thu, Mar 16, 2017 at 2:11 PM, Damian Guy <da...@gmail.com>
>> wrote:
>> >
>> > > I think what you are seeing is that the order is not guaranteed across
>> > > partitions. When you use Queryable State you are actually querying
>> > multiple
>> > > underlying stores, i.e., one per partition. The implementation
>> iterates
>> > > over one store/partition at a time, so the ordering will appear
>> random.
>> > > This could be improved
>> > >
>> > > The tombstone records appearing in the results seems like a bug.
>> > >
>> > > Thanks,
>> > > Damian
>> > >
>> > > On Thu, 16 Mar 2017 at 17:37 Matthias J. Sax <ma...@confluent.io>
>> > > wrote:
>> > >
>> > > > Can you check if the problem exist for 0.10.2, too? (0.10.2 is
>> > > > compatible to 0.10.1 broker -- so you can upgrade your Streams code
>> > > > independently from the brokers).
>> > > >
>> > > > About the range: I did double check this, and I guess my last answer
>> > was
>> > > > not correct, and range() should return ordered data, but I got a
>> follow
>> > > > up question: what the key type and serializer you use? Internally,
>> data
>> > > > is stored in serialized form and ordered according to
>> > > > `LexicographicByteArrayComparator` -- thus, if the serialized bytes
>> > > > don't reflect the order of the deserialized data, it returned range
>> > > > shows up unordered to you.
>> > > >
>> > > >
>> > > > -Matthias
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > On 3/16/17 10:14 AM, Dmitry Minkovsky wrote:
>> > > > > Hi Matthias. Thank you for your response.
>> > > > >
>> > > > > Yes, I was able to reproduce the null issue reliably. I can't
>> open a
>> > > JIRA
>> > > > > at this time, but I can say I was using 0.10.1.0 and it was
>> trivial
>> > to
>> > > > > reproduce. Just send records and the tombstones to a table topic.
>> > Then
>> > > > scan
>> > > > > the range. You'll see the tombstones.
>> > > > >
>> > > > > Indeed, ranges are returned with no specific order. I'm not sure
>> what
>> > > you
>> > > > > mean that default stores are hash-based, but this ordering thing
>> is a
>> > > > shame
>> > > > > because it kind of kills the ability to use KS as a full fledged
>> DB
>> > > that
>> > > > > lets you index things like HBase (composite keys for lists of
>> items).
>> > > Is
>> > > > > that how RocksDB works? Just returns range scans in random order?
>> I
>> > > don't
>> > > > > know C++ so the documentation is a bit opaque to me. But what's
>> the
>> > > point
>> > > > > of scanning a range if the data comes in some random order? That
>> > being
>> > > > the
>> > > > > case, the number of possible use-case scenarios seem to become
>> > > > > significantly limited.
>> > > > >
>> > > > >
>> > > > > Thank you!
>> > > > > Dmitry
>> > > > >
>> > > > > On Tue, Mar 14, 2017 at 1:12 PM, Matthias J. Sax <
>> > > matthias@confluent.io>
>> > > > > wrote:
>> > > > >
>> > > > >>> However,
>> > > > >>>> for keys that have been tombstoned, it does return null for me.
>> > > > >>
>> > > > >> Sound like a bug. Can you reliable reproduce this? Would you mind
>> > > > >> opening a JIRA?
>> > > > >>
>> > > > >> Can you check if this happens for both cases: caching enabled and
>> > > > >> disabled? Or only for once case?
>> > > > >>
>> > > > >>
>> > > > >>> "No ordering guarantees are provided."
>> > > > >>
>> > > > >> That is correct. Internally, default stores are hash-based --
>> thus,
>> > we
>> > > > >> don't give a sorted list/iterator back. You could replace RocksDB
>> > > with a
>> > > > >> custom store though.
>> > > > >>
>> > > > >>
>> > > > >> -Matthias
>> > > > >>
>> > > > >>
>> > > > >> On 3/13/17 3:56 PM, Dmitry Minkovsky wrote:
>> > > > >>> I am using interactive streams to query tables:
>> > > > >>>
>> > > > >>>             ReadOnlyKeyValueStore<Messages.ByUserAndDate,
>> > > > >>> Messages.UserLetter> store
>> > > > >>>               = streams.store("view-user-drafts",
>> > > > >>> QueryableStoreTypes.keyValueStore());
>> > > > >>>
>> > > > >>> Documentation says that #range() should not return null values.
>> > > > However,
>> > > > >>> for keys that have been tombstoned, it does return null for me.
>> > > > >>>
>> > > > >>> Also, I noticed only just now that "No ordering guarantees are
>> > > > >> provided." I
>> > > > >>> haven't done enough testing or looked at the code carefully
>> enough
>> > > yet
>> > > > >> and
>> > > > >>> wonder if someone who knows could confirm: is this true? Is this
>> > > common
>> > > > >> to
>> > > > >>> all store implementations? I was hoping to use interactive
>> streams
>> > > like
>> > > > >>> HBase to scan ranges. It appears this is not possible.
>> > > > >>>
>> > > > >>> Thank you,
>> > > > >>> Dmitry
>> > > > >>>
>> > > > >>
>> > > > >>
>> > > > >
>> > > >
>> > > >
>> > >
>> >
>>
>
>

Re: Kafka Streams: ReadOnlyKeyValueStore range behavior

Posted by Dmitry Minkovsky <dm...@gmail.com>.
Ah! Yes. Thank you! That make sense.

Anyway, I _think_ that's not what I was doing given that all items were
being routed to and then read from a partition identified by one key.

On Fri, Mar 17, 2017 at 12:50 PM, Damian Guy <da...@gmail.com> wrote:

> > When you use Queryable State you are actually querying multiple
>
> > underlying stores, i.e., one per partition.
> >
> > Huh? I was only querying one partition. In my example, I have a user's
> > posts. Upon creation, they are routed to a particular partition using a
> > partitioner that hashes the post's user ID. The posts are then indexed on
> > that partition by prefixed keys using the method described above. When
> > querying, I am only querying the one partition that has all of the user's
> > posts. As far as I know, I am not querying across multiple partitions.
> > Furthermore, I did not even think this was possible, given the fact that
> > Interactive Queries require you to manually forward requests that should
> go
> > to other partitions.
> >
> >
> Each KafkaStreams instance is potentially responsible for multiple
> partitions, so when you use Queryable State on a particular instance you
> are querying all partitions for that store on the given instance.
>
>
>
> >
> >
> >
> >
> >
> > On Thu, Mar 16, 2017 at 2:11 PM, Damian Guy <da...@gmail.com>
> wrote:
> >
> > > I think what you are seeing is that the order is not guaranteed across
> > > partitions. When you use Queryable State you are actually querying
> > multiple
> > > underlying stores, i.e., one per partition. The implementation iterates
> > > over one store/partition at a time, so the ordering will appear random.
> > > This could be improved
> > >
> > > The tombstone records appearing in the results seems like a bug.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Thu, 16 Mar 2017 at 17:37 Matthias J. Sax <ma...@confluent.io>
> > > wrote:
> > >
> > > > Can you check if the problem exist for 0.10.2, too? (0.10.2 is
> > > > compatible to 0.10.1 broker -- so you can upgrade your Streams code
> > > > independently from the brokers).
> > > >
> > > > About the range: I did double check this, and I guess my last answer
> > was
> > > > not correct, and range() should return ordered data, but I got a
> follow
> > > > up question: what the key type and serializer you use? Internally,
> data
> > > > is stored in serialized form and ordered according to
> > > > `LexicographicByteArrayComparator` -- thus, if the serialized bytes
> > > > don't reflect the order of the deserialized data, it returned range
> > > > shows up unordered to you.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > >
> > > >
> > > > On 3/16/17 10:14 AM, Dmitry Minkovsky wrote:
> > > > > Hi Matthias. Thank you for your response.
> > > > >
> > > > > Yes, I was able to reproduce the null issue reliably. I can't open
> a
> > > JIRA
> > > > > at this time, but I can say I was using 0.10.1.0 and it was trivial
> > to
> > > > > reproduce. Just send records and the tombstones to a table topic.
> > Then
> > > > scan
> > > > > the range. You'll see the tombstones.
> > > > >
> > > > > Indeed, ranges are returned with no specific order. I'm not sure
> what
> > > you
> > > > > mean that default stores are hash-based, but this ordering thing
> is a
> > > > shame
> > > > > because it kind of kills the ability to use KS as a full fledged DB
> > > that
> > > > > lets you index things like HBase (composite keys for lists of
> items).
> > > Is
> > > > > that how RocksDB works? Just returns range scans in random order? I
> > > don't
> > > > > know C++ so the documentation is a bit opaque to me. But what's the
> > > point
> > > > > of scanning a range if the data comes in some random order? That
> > being
> > > > the
> > > > > case, the number of possible use-case scenarios seem to become
> > > > > significantly limited.
> > > > >
> > > > >
> > > > > Thank you!
> > > > > Dmitry
> > > > >
> > > > > On Tue, Mar 14, 2017 at 1:12 PM, Matthias J. Sax <
> > > matthias@confluent.io>
> > > > > wrote:
> > > > >
> > > > >>> However,
> > > > >>>> for keys that have been tombstoned, it does return null for me.
> > > > >>
> > > > >> Sound like a bug. Can you reliable reproduce this? Would you mind
> > > > >> opening a JIRA?
> > > > >>
> > > > >> Can you check if this happens for both cases: caching enabled and
> > > > >> disabled? Or only for once case?
> > > > >>
> > > > >>
> > > > >>> "No ordering guarantees are provided."
> > > > >>
> > > > >> That is correct. Internally, default stores are hash-based --
> thus,
> > we
> > > > >> don't give a sorted list/iterator back. You could replace RocksDB
> > > with a
> > > > >> custom store though.
> > > > >>
> > > > >>
> > > > >> -Matthias
> > > > >>
> > > > >>
> > > > >> On 3/13/17 3:56 PM, Dmitry Minkovsky wrote:
> > > > >>> I am using interactive streams to query tables:
> > > > >>>
> > > > >>>             ReadOnlyKeyValueStore<Messages.ByUserAndDate,
> > > > >>> Messages.UserLetter> store
> > > > >>>               = streams.store("view-user-drafts",
> > > > >>> QueryableStoreTypes.keyValueStore());
> > > > >>>
> > > > >>> Documentation says that #range() should not return null values.
> > > > However,
> > > > >>> for keys that have been tombstoned, it does return null for me.
> > > > >>>
> > > > >>> Also, I noticed only just now that "No ordering guarantees are
> > > > >> provided." I
> > > > >>> haven't done enough testing or looked at the code carefully
> enough
> > > yet
> > > > >> and
> > > > >>> wonder if someone who knows could confirm: is this true? Is this
> > > common
> > > > >> to
> > > > >>> all store implementations? I was hoping to use interactive
> streams
> > > like
> > > > >>> HBase to scan ranges. It appears this is not possible.
> > > > >>>
> > > > >>> Thank you,
> > > > >>> Dmitry
> > > > >>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
>

Re: Kafka Streams: ReadOnlyKeyValueStore range behavior

Posted by Damian Guy <da...@gmail.com>.
> When you use Queryable State you are actually querying multiple

> underlying stores, i.e., one per partition.
>
> Huh? I was only querying one partition. In my example, I have a user's
> posts. Upon creation, they are routed to a particular partition using a
> partitioner that hashes the post's user ID. The posts are then indexed on
> that partition by prefixed keys using the method described above. When
> querying, I am only querying the one partition that has all of the user's
> posts. As far as I know, I am not querying across multiple partitions.
> Furthermore, I did not even think this was possible, given the fact that
> Interactive Queries require you to manually forward requests that should go
> to other partitions.
>
>
Each KafkaStreams instance is potentially responsible for multiple
partitions, so when you use Queryable State on a particular instance you
are querying all partitions for that store on the given instance.



>
>
>
>
>
> On Thu, Mar 16, 2017 at 2:11 PM, Damian Guy <da...@gmail.com> wrote:
>
> > I think what you are seeing is that the order is not guaranteed across
> > partitions. When you use Queryable State you are actually querying
> multiple
> > underlying stores, i.e., one per partition. The implementation iterates
> > over one store/partition at a time, so the ordering will appear random.
> > This could be improved
> >
> > The tombstone records appearing in the results seems like a bug.
> >
> > Thanks,
> > Damian
> >
> > On Thu, 16 Mar 2017 at 17:37 Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > Can you check if the problem exist for 0.10.2, too? (0.10.2 is
> > > compatible to 0.10.1 broker -- so you can upgrade your Streams code
> > > independently from the brokers).
> > >
> > > About the range: I did double check this, and I guess my last answer
> was
> > > not correct, and range() should return ordered data, but I got a follow
> > > up question: what the key type and serializer you use? Internally, data
> > > is stored in serialized form and ordered according to
> > > `LexicographicByteArrayComparator` -- thus, if the serialized bytes
> > > don't reflect the order of the deserialized data, it returned range
> > > shows up unordered to you.
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > >
> > > On 3/16/17 10:14 AM, Dmitry Minkovsky wrote:
> > > > Hi Matthias. Thank you for your response.
> > > >
> > > > Yes, I was able to reproduce the null issue reliably. I can't open a
> > JIRA
> > > > at this time, but I can say I was using 0.10.1.0 and it was trivial
> to
> > > > reproduce. Just send records and the tombstones to a table topic.
> Then
> > > scan
> > > > the range. You'll see the tombstones.
> > > >
> > > > Indeed, ranges are returned with no specific order. I'm not sure what
> > you
> > > > mean that default stores are hash-based, but this ordering thing is a
> > > shame
> > > > because it kind of kills the ability to use KS as a full fledged DB
> > that
> > > > lets you index things like HBase (composite keys for lists of items).
> > Is
> > > > that how RocksDB works? Just returns range scans in random order? I
> > don't
> > > > know C++ so the documentation is a bit opaque to me. But what's the
> > point
> > > > of scanning a range if the data comes in some random order? That
> being
> > > the
> > > > case, the number of possible use-case scenarios seem to become
> > > > significantly limited.
> > > >
> > > >
> > > > Thank you!
> > > > Dmitry
> > > >
> > > > On Tue, Mar 14, 2017 at 1:12 PM, Matthias J. Sax <
> > matthias@confluent.io>
> > > > wrote:
> > > >
> > > >>> However,
> > > >>>> for keys that have been tombstoned, it does return null for me.
> > > >>
> > > >> Sound like a bug. Can you reliable reproduce this? Would you mind
> > > >> opening a JIRA?
> > > >>
> > > >> Can you check if this happens for both cases: caching enabled and
> > > >> disabled? Or only for once case?
> > > >>
> > > >>
> > > >>> "No ordering guarantees are provided."
> > > >>
> > > >> That is correct. Internally, default stores are hash-based -- thus,
> we
> > > >> don't give a sorted list/iterator back. You could replace RocksDB
> > with a
> > > >> custom store though.
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >>
> > > >> On 3/13/17 3:56 PM, Dmitry Minkovsky wrote:
> > > >>> I am using interactive streams to query tables:
> > > >>>
> > > >>>             ReadOnlyKeyValueStore<Messages.ByUserAndDate,
> > > >>> Messages.UserLetter> store
> > > >>>               = streams.store("view-user-drafts",
> > > >>> QueryableStoreTypes.keyValueStore());
> > > >>>
> > > >>> Documentation says that #range() should not return null values.
> > > However,
> > > >>> for keys that have been tombstoned, it does return null for me.
> > > >>>
> > > >>> Also, I noticed only just now that "No ordering guarantees are
> > > >> provided." I
> > > >>> haven't done enough testing or looked at the code carefully enough
> > yet
> > > >> and
> > > >>> wonder if someone who knows could confirm: is this true? Is this
> > common
> > > >> to
> > > >>> all store implementations? I was hoping to use interactive streams
> > like
> > > >>> HBase to scan ranges. It appears this is not possible.
> > > >>>
> > > >>> Thank you,
> > > >>> Dmitry
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> > >
> >
>

Re: Kafka Streams: ReadOnlyKeyValueStore range behavior

Posted by Dmitry Minkovsky <dm...@gmail.com>.
Matthias, Damian:

Thank you for your replies.

> Can you check if the problem exist for 0.10.2, too?

I will upgrade to 0.10.2 after this development cycle. I'm still in
development so compatibility is not as big an issue as getting to
production.

>  range() should return ordered data,

In my experiments, the order in which the data was returned the first time
is the order it was returned all subsequent times. But that order was not
lexicographic, but seemingly random.

> what the key type and serializer you use?

I am using Protocol Buffers, which are ordered structs. You construct the
protocol buffers object, and then the serializer calls ".toByteArray()" on
it to get the bytes. I thought this would a very simple way to create keys
that when serialized would facilitate prefixed range scans. For example, a
Protocol Buffer message like

message {
   bytes user_id = 1;
   bytes post_id = 2;
}

when serialized puts the user_id first, then the post_id in the total byte
string. Some Protocol Buffers data types use variable-length encoding, so I
was careful not to use any of these types in my keys.

> When you use Queryable State you are actually querying multiple
underlying stores, i.e., one per partition.

Huh? I was only querying one partition. In my example, I have a user's
posts. Upon creation, they are routed to a particular partition using a
partitioner that hashes the post's user ID. The posts are then indexed on
that partition by prefixed keys using the method described above. When
querying, I am only querying the one partition that has all of the user's
posts. As far as I know, I am not querying across multiple partitions.
Furthermore, I did not even think this was possible, given the fact that
Interactive Queries require you to manually forward requests that should go
to other partitions.






On Thu, Mar 16, 2017 at 2:11 PM, Damian Guy <da...@gmail.com> wrote:

> I think what you are seeing is that the order is not guaranteed across
> partitions. When you use Queryable State you are actually querying multiple
> underlying stores, i.e., one per partition. The implementation iterates
> over one store/partition at a time, so the ordering will appear random.
> This could be improved
>
> The tombstone records appearing in the results seems like a bug.
>
> Thanks,
> Damian
>
> On Thu, 16 Mar 2017 at 17:37 Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > Can you check if the problem exist for 0.10.2, too? (0.10.2 is
> > compatible to 0.10.1 broker -- so you can upgrade your Streams code
> > independently from the brokers).
> >
> > About the range: I did double check this, and I guess my last answer was
> > not correct, and range() should return ordered data, but I got a follow
> > up question: what the key type and serializer you use? Internally, data
> > is stored in serialized form and ordered according to
> > `LexicographicByteArrayComparator` -- thus, if the serialized bytes
> > don't reflect the order of the deserialized data, it returned range
> > shows up unordered to you.
> >
> >
> > -Matthias
> >
> >
> >
> >
> > On 3/16/17 10:14 AM, Dmitry Minkovsky wrote:
> > > Hi Matthias. Thank you for your response.
> > >
> > > Yes, I was able to reproduce the null issue reliably. I can't open a
> JIRA
> > > at this time, but I can say I was using 0.10.1.0 and it was trivial to
> > > reproduce. Just send records and the tombstones to a table topic. Then
> > scan
> > > the range. You'll see the tombstones.
> > >
> > > Indeed, ranges are returned with no specific order. I'm not sure what
> you
> > > mean that default stores are hash-based, but this ordering thing is a
> > shame
> > > because it kind of kills the ability to use KS as a full fledged DB
> that
> > > lets you index things like HBase (composite keys for lists of items).
> Is
> > > that how RocksDB works? Just returns range scans in random order? I
> don't
> > > know C++ so the documentation is a bit opaque to me. But what's the
> point
> > > of scanning a range if the data comes in some random order? That being
> > the
> > > case, the number of possible use-case scenarios seem to become
> > > significantly limited.
> > >
> > >
> > > Thank you!
> > > Dmitry
> > >
> > > On Tue, Mar 14, 2017 at 1:12 PM, Matthias J. Sax <
> matthias@confluent.io>
> > > wrote:
> > >
> > >>> However,
> > >>>> for keys that have been tombstoned, it does return null for me.
> > >>
> > >> Sound like a bug. Can you reliable reproduce this? Would you mind
> > >> opening a JIRA?
> > >>
> > >> Can you check if this happens for both cases: caching enabled and
> > >> disabled? Or only for once case?
> > >>
> > >>
> > >>> "No ordering guarantees are provided."
> > >>
> > >> That is correct. Internally, default stores are hash-based -- thus, we
> > >> don't give a sorted list/iterator back. You could replace RocksDB
> with a
> > >> custom store though.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >> On 3/13/17 3:56 PM, Dmitry Minkovsky wrote:
> > >>> I am using interactive streams to query tables:
> > >>>
> > >>>             ReadOnlyKeyValueStore<Messages.ByUserAndDate,
> > >>> Messages.UserLetter> store
> > >>>               = streams.store("view-user-drafts",
> > >>> QueryableStoreTypes.keyValueStore());
> > >>>
> > >>> Documentation says that #range() should not return null values.
> > However,
> > >>> for keys that have been tombstoned, it does return null for me.
> > >>>
> > >>> Also, I noticed only just now that "No ordering guarantees are
> > >> provided." I
> > >>> haven't done enough testing or looked at the code carefully enough
> yet
> > >> and
> > >>> wonder if someone who knows could confirm: is this true? Is this
> common
> > >> to
> > >>> all store implementations? I was hoping to use interactive streams
> like
> > >>> HBase to scan ranges. It appears this is not possible.
> > >>>
> > >>> Thank you,
> > >>> Dmitry
> > >>>
> > >>
> > >>
> > >
> >
> >
>

Re: Kafka Streams: ReadOnlyKeyValueStore range behavior

Posted by Damian Guy <da...@gmail.com>.
I think what you are seeing is that the order is not guaranteed across
partitions. When you use Queryable State you are actually querying multiple
underlying stores, i.e., one per partition. The implementation iterates
over one store/partition at a time, so the ordering will appear random.
This could be improved

The tombstone records appearing in the results seems like a bug.

Thanks,
Damian

On Thu, 16 Mar 2017 at 17:37 Matthias J. Sax <ma...@confluent.io> wrote:

> Can you check if the problem exist for 0.10.2, too? (0.10.2 is
> compatible to 0.10.1 broker -- so you can upgrade your Streams code
> independently from the brokers).
>
> About the range: I did double check this, and I guess my last answer was
> not correct, and range() should return ordered data, but I got a follow
> up question: what the key type and serializer you use? Internally, data
> is stored in serialized form and ordered according to
> `LexicographicByteArrayComparator` -- thus, if the serialized bytes
> don't reflect the order of the deserialized data, it returned range
> shows up unordered to you.
>
>
> -Matthias
>
>
>
>
> On 3/16/17 10:14 AM, Dmitry Minkovsky wrote:
> > Hi Matthias. Thank you for your response.
> >
> > Yes, I was able to reproduce the null issue reliably. I can't open a JIRA
> > at this time, but I can say I was using 0.10.1.0 and it was trivial to
> > reproduce. Just send records and the tombstones to a table topic. Then
> scan
> > the range. You'll see the tombstones.
> >
> > Indeed, ranges are returned with no specific order. I'm not sure what you
> > mean that default stores are hash-based, but this ordering thing is a
> shame
> > because it kind of kills the ability to use KS as a full fledged DB that
> > lets you index things like HBase (composite keys for lists of items). Is
> > that how RocksDB works? Just returns range scans in random order? I don't
> > know C++ so the documentation is a bit opaque to me. But what's the point
> > of scanning a range if the data comes in some random order? That being
> the
> > case, the number of possible use-case scenarios seem to become
> > significantly limited.
> >
> >
> > Thank you!
> > Dmitry
> >
> > On Tue, Mar 14, 2017 at 1:12 PM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >>> However,
> >>>> for keys that have been tombstoned, it does return null for me.
> >>
> >> Sound like a bug. Can you reliable reproduce this? Would you mind
> >> opening a JIRA?
> >>
> >> Can you check if this happens for both cases: caching enabled and
> >> disabled? Or only for once case?
> >>
> >>
> >>> "No ordering guarantees are provided."
> >>
> >> That is correct. Internally, default stores are hash-based -- thus, we
> >> don't give a sorted list/iterator back. You could replace RocksDB with a
> >> custom store though.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 3/13/17 3:56 PM, Dmitry Minkovsky wrote:
> >>> I am using interactive streams to query tables:
> >>>
> >>>             ReadOnlyKeyValueStore<Messages.ByUserAndDate,
> >>> Messages.UserLetter> store
> >>>               = streams.store("view-user-drafts",
> >>> QueryableStoreTypes.keyValueStore());
> >>>
> >>> Documentation says that #range() should not return null values.
> However,
> >>> for keys that have been tombstoned, it does return null for me.
> >>>
> >>> Also, I noticed only just now that "No ordering guarantees are
> >> provided." I
> >>> haven't done enough testing or looked at the code carefully enough yet
> >> and
> >>> wonder if someone who knows could confirm: is this true? Is this common
> >> to
> >>> all store implementations? I was hoping to use interactive streams like
> >>> HBase to scan ranges. It appears this is not possible.
> >>>
> >>> Thank you,
> >>> Dmitry
> >>>
> >>
> >>
> >
>
>

Re: Kafka Streams: ReadOnlyKeyValueStore range behavior

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Can you check if the problem exist for 0.10.2, too? (0.10.2 is
compatible to 0.10.1 broker -- so you can upgrade your Streams code
independently from the brokers).

About the range: I did double check this, and I guess my last answer was
not correct, and range() should return ordered data, but I got a follow
up question: what the key type and serializer you use? Internally, data
is stored in serialized form and ordered according to
`LexicographicByteArrayComparator` -- thus, if the serialized bytes
don't reflect the order of the deserialized data, it returned range
shows up unordered to you.


-Matthias




On 3/16/17 10:14 AM, Dmitry Minkovsky wrote:
> Hi Matthias. Thank you for your response.
> 
> Yes, I was able to reproduce the null issue reliably. I can't open a JIRA
> at this time, but I can say I was using 0.10.1.0 and it was trivial to
> reproduce. Just send records and the tombstones to a table topic. Then scan
> the range. You'll see the tombstones.
> 
> Indeed, ranges are returned with no specific order. I'm not sure what you
> mean that default stores are hash-based, but this ordering thing is a shame
> because it kind of kills the ability to use KS as a full fledged DB that
> lets you index things like HBase (composite keys for lists of items). Is
> that how RocksDB works? Just returns range scans in random order? I don't
> know C++ so the documentation is a bit opaque to me. But what's the point
> of scanning a range if the data comes in some random order? That being the
> case, the number of possible use-case scenarios seem to become
> significantly limited.
> 
> 
> Thank you!
> Dmitry
> 
> On Tue, Mar 14, 2017 at 1:12 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>>> However,
>>>> for keys that have been tombstoned, it does return null for me.
>>
>> Sound like a bug. Can you reliable reproduce this? Would you mind
>> opening a JIRA?
>>
>> Can you check if this happens for both cases: caching enabled and
>> disabled? Or only for once case?
>>
>>
>>> "No ordering guarantees are provided."
>>
>> That is correct. Internally, default stores are hash-based -- thus, we
>> don't give a sorted list/iterator back. You could replace RocksDB with a
>> custom store though.
>>
>>
>> -Matthias
>>
>>
>> On 3/13/17 3:56 PM, Dmitry Minkovsky wrote:
>>> I am using interactive streams to query tables:
>>>
>>>             ReadOnlyKeyValueStore<Messages.ByUserAndDate,
>>> Messages.UserLetter> store
>>>               = streams.store("view-user-drafts",
>>> QueryableStoreTypes.keyValueStore());
>>>
>>> Documentation says that #range() should not return null values. However,
>>> for keys that have been tombstoned, it does return null for me.
>>>
>>> Also, I noticed only just now that "No ordering guarantees are
>> provided." I
>>> haven't done enough testing or looked at the code carefully enough yet
>> and
>>> wonder if someone who knows could confirm: is this true? Is this common
>> to
>>> all store implementations? I was hoping to use interactive streams like
>>> HBase to scan ranges. It appears this is not possible.
>>>
>>> Thank you,
>>> Dmitry
>>>
>>
>>
> 


Re: Kafka Streams: ReadOnlyKeyValueStore range behavior

Posted by Dmitry Minkovsky <dm...@gmail.com>.
Hi Matthias. Thank you for your response.

Yes, I was able to reproduce the null issue reliably. I can't open a JIRA
at this time, but I can say I was using 0.10.1.0 and it was trivial to
reproduce. Just send records and the tombstones to a table topic. Then scan
the range. You'll see the tombstones.

Indeed, ranges are returned with no specific order. I'm not sure what you
mean that default stores are hash-based, but this ordering thing is a shame
because it kind of kills the ability to use KS as a full fledged DB that
lets you index things like HBase (composite keys for lists of items). Is
that how RocksDB works? Just returns range scans in random order? I don't
know C++ so the documentation is a bit opaque to me. But what's the point
of scanning a range if the data comes in some random order? That being the
case, the number of possible use-case scenarios seem to become
significantly limited.


Thank you!
Dmitry

On Tue, Mar 14, 2017 at 1:12 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> > However,
> >> for keys that have been tombstoned, it does return null for me.
>
> Sound like a bug. Can you reliable reproduce this? Would you mind
> opening a JIRA?
>
> Can you check if this happens for both cases: caching enabled and
> disabled? Or only for once case?
>
>
> > "No ordering guarantees are provided."
>
> That is correct. Internally, default stores are hash-based -- thus, we
> don't give a sorted list/iterator back. You could replace RocksDB with a
> custom store though.
>
>
> -Matthias
>
>
> On 3/13/17 3:56 PM, Dmitry Minkovsky wrote:
> > I am using interactive streams to query tables:
> >
> >             ReadOnlyKeyValueStore<Messages.ByUserAndDate,
> > Messages.UserLetter> store
> >               = streams.store("view-user-drafts",
> > QueryableStoreTypes.keyValueStore());
> >
> > Documentation says that #range() should not return null values. However,
> > for keys that have been tombstoned, it does return null for me.
> >
> > Also, I noticed only just now that "No ordering guarantees are
> provided." I
> > haven't done enough testing or looked at the code carefully enough yet
> and
> > wonder if someone who knows could confirm: is this true? Is this common
> to
> > all store implementations? I was hoping to use interactive streams like
> > HBase to scan ranges. It appears this is not possible.
> >
> > Thank you,
> > Dmitry
> >
>
>

Re: Kafka Streams: ReadOnlyKeyValueStore range behavior

Posted by "Matthias J. Sax" <ma...@confluent.io>.
> However,
>> for keys that have been tombstoned, it does return null for me.

Sound like a bug. Can you reliable reproduce this? Would you mind
opening a JIRA?

Can you check if this happens for both cases: caching enabled and
disabled? Or only for once case?


> "No ordering guarantees are provided."

That is correct. Internally, default stores are hash-based -- thus, we
don't give a sorted list/iterator back. You could replace RocksDB with a
custom store though.


-Matthias


On 3/13/17 3:56 PM, Dmitry Minkovsky wrote:
> I am using interactive streams to query tables:
> 
>             ReadOnlyKeyValueStore<Messages.ByUserAndDate,
> Messages.UserLetter> store
>               = streams.store("view-user-drafts",
> QueryableStoreTypes.keyValueStore());
> 
> Documentation says that #range() should not return null values. However,
> for keys that have been tombstoned, it does return null for me.
> 
> Also, I noticed only just now that "No ordering guarantees are provided." I
> haven't done enough testing or looked at the code carefully enough yet and
> wonder if someone who knows could confirm: is this true? Is this common to
> all store implementations? I was hoping to use interactive streams like
> HBase to scan ranges. It appears this is not possible.
> 
> Thank you,
> Dmitry
>