You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Mikael Högqvist <ho...@gmail.com> on 2016/09/04 22:08:33 UTC

Re: Queryable state client read guarantees

Hi Eno,

thanks for the response and sorry for not getting back earlier. I think it
makes sense and the example is great! To make it possible to experiment
with the guarantees/semantics, I've created a tool available at:
https://github.com/mkhq/kafka-qs-verify. Basically it can be used to trace
read requests over time, e.g. one read per second using a client that
queries multiple instances.

By doing this I observed a couple of interesting things with the current
implementation. Note that these observations can also be a result of me
using kafka streams in the wrong way, e.g. assumptions, setup or tool
implementation. The topology the tool uses is counting strings and stores
them in an output table.

The experiments are setup with an input topic containing "hello" 5 times.
StoreUnavailable in the traces below means that the call to
streams.store(table) returned null and KeyNotFound happens when
store.get(key) is null. The first run of the tool creates a trace where the
store is unavailable followed by key not found and finally the correct
value of 5.

> Failed to read key hello, org.mkhq.kafka.Topology$StoreUnavailable
> Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound
> hello -> 5

The second run, i.e. a restart of the java app with the same app-id and
table name as the previous run, creates the following trace:

> Failed to read key hello, org.mkhq.kafka.Topology$StoreUnavailable
> Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound
> hello -> 10

I assumed that two things would be different:

a) KeyNotFound should not happen since the store was already initialized
and had associated hello with 5. This could violate the "never read an
older value"-rule if an instance restarts.

b) hello should still have the value 5.

Does a) have to do with the start-up sequence of the stream instance? E.g.
replay of the changelog for a table can take longer?

For b) it looks like the stream instance re-processes the input topic data.
Maybe it's related to setup? In this case, the consumer config
"AUTO_OFFSET_RESET_CONFIG" was not defined in the settings. Defining
"latest" results in only KeyNotFound.

I've also tried out a couple of other scenarios when querying multiple
instances with "random" restarts, but maybe its better if we start with the
simplest cases.

Thanks,
Mikael

> One more thing, there is an example of an end-to-end REST service that demonstrates one possible
> way to query at https://github.com/confluentinc/examples/tree/master/kafka-streams/src/main/java/io/confluent/examples/streams/queryablestate
> <https://github.com/confluentinc/examples/tree/master/kafka-streams/src/main/java/io/confluent/examples/streams/queryablestate>.
> The instructions on how to run are in QueryableStateExample.java.
>
> Thanks
> Eno
>
>
> > On 26 Aug 2016, at 18:07, Eno Thereska <en...@gmail.com> wrote:
> >
> > Hi Mikael,
> >
> > Very good question. You are correct about the desired semantics.
> >
> > The semantic of case (a) depends on the local store as you mention. For case (b), the
> final check is always performed again on get(), and if the store has disappeared between the
> lookup and get, the user will get an exception and will have to retry. The state store in
> A does become invalid when the state is re-assigned. There isn't any other way to detect the
> change, since we wanted to hide the system details (e.g., rebalance) from the user.
> >
> > Does this make sense?
> >
> > Thanks
> > Eno
> >
> >> On 26 Aug 2016, at 16:26, Mikael Högqvist <ho...@gmail.com> wrote:
> >>
> >> Hi,
> >>
> >> I've tried to understand the implementation and APIs from KIP-67 and would
> >> like to know the possible semantics for read requests from a client
> >> perspective. As a developer of a queryable state client, the access
> >> semantics I would like to have (I think...) is one where subsequent reads
> >> always return the value from the last read or a newer value (if the state
> >> store is available). This should be independent of the current system
> >> configuration, e.g. re-balancing, failures etc. .
> >>
> >> A client-side get(k) can be implemented by starting with a lookup for the
> >> instances that store k followed by a retrieve of the value associated with
> >> k from the instances returned by the lookup. In the worst case we can
> >> always do scatter+gather over all instances.
> >>
> >> We can start by considering a get(k) under two failure-free cases: a)
> >> single instance and b) a system going from one instance to two instances. In
> >> case a) the lookup will always return the same instance and the following
> >> get will read from a local store. The semantics in this case depends on the
> >> local store.
> >>
> >> For case b) the lookup returns instance A, but in between the lookup and
> >> the get, a new instance B is introduced to which k is transferred? Does the
> >> state store on A become invalid when the state is re-assigned? Is there
> >> another way for the client to detect the change?
> >>
> >> Best Regards,
> >> Mikael
> >
>
>
>

Re: Queryable state client read guarantees

Posted by Mikael Högqvist <ho...@gmail.com>.
Hi,

this helps, thanks!

Basically, after each read, I'll check if the key is still supposed to be
on the host. Doing the check after the read is necessary to handle the case
when a rebalance happens in between the metadata lookup and the store get.
When checking after the read, it may happen that a valid read becomes
invalid, but that doesn't affect correctness.

During a rebalance the service either responds not available or redirect.
After the rebalance is completed, the store responds with redirect. With a
REST API, this could mean either 404 or a 303, temporary redirect to the
current host.

Best,
Mikael

On Mon, Sep 12, 2016 at 5:42 AM Guozhang Wang <wa...@gmail.com> wrote:

> Hi Mikael,
>
> Just adding to Damian's comment above, that the IllegalStateStoreException
> here is thrown to indicate a "transient" state where the state store
> hosting this key is being migrated and hence not available, where users
> implementing the REST APIs on top of it, for example, can choose to handle
> it differently. For example, either return a sentinel value as "key not
> available" or return some error codes.
>
> Guozhang
>
>
> On Fri, Sep 9, 2016 at 9:40 AM, Damian Guy <da...@gmail.com> wrote:
>
> > Hi Mikael,
> >
> > During rebalance both instances should throw IllegalStateStoreException
> > until the rebalance has completed. Once the rebalance has completed if
> the
> > key is not found on the local store, then you would get a null value. You
> > can always find the Kafka Streams instance that will have that key
> > (assuming it exists) by using:
> >
> > StreamsMetadata KafkaStreams.metadataForKey(String storeName, K key,
> > Serializer<K> keySerializer)
> >
> > The StreamsMetadata will tell you which instance, via HostInfo, has the
> > given key.
> >
> > HTH,
> > Damian
> >
> >
> >
> >
> > On Fri, 9 Sep 2016 at 16:56 Mikael Högqvist <ho...@gmail.com> wrote:
> >
> > > Hi Damian,
> > >
> > > thanks for fixing this so quickly, I re-ran the test and it works fine.
> > >
> > > The next test I tried was to read from two service instances
> implementing
> > > the same string count topology. First, the client is started sending
> two
> > > read requests, one per instance, every second. Next, I start the first
> > > instance and let it complete the store init before the next instance is
> > > started.
> > >
> > > Below is the initial part of the trace when going from 0 to 1 instance.
> > The
> > > trace log has the following columns: request id, instance, response
> code
> > > and value.
> > >
> > > 3,localhost:2030,503,
> > > 3,localhost:2031,503,
> > > 4,localhost:2030,503,
> > > 4,localhost:2031,503,
> > > 5,localhost:2030,200,2
> > > 5,localhost:2031,503,
> > > 6,localhost:2030,200,2
> > > 6,localhost:2031,503,
> > >
> > > Before the instance is started, both return 503, the status returned by
> > the
> > > client when it cannot connect to an instance. When the first instance
> has
> > > started it returns the expected value 2 for request pair 5, 6 and so
> on.
> > > The trace below is from when the second instance starts.
> > >
> > > 18,localhost:2030,200,2
> > > 18,localhost:2031,503,
> > > 19,localhost:2030,404,
> > > 19,localhost:2031,503,
> > > 20,localhost:2030,404,
> > > 20,localhost:2031,503,
> > > 21,localhost:2030,404,
> > > 21,localhost:2031,200,2
> > > 22,localhost:2030,404,
> > > 22,localhost:2031,200,2
> > >
> > > The new instance takes over responsibility for the partition containing
> > the
> > > key "hello". During this period the new instance returns 503 as
> expected
> > > until the store is ready. The issue is that the first instance that
> > stored
> > > the value starts returning 404 from request pair 19. A client doing
> > > requests for this key would then have the following sequence:
> > >
> > > 18 -> 2
> > > 19 -> Not found
> > > 20 -> Not found
> > > 21 -> 2
> > >
> > > From the client perspective, I think this violates the guarantee of
> > always
> > > reading the latest value.
> > >
> > > Am I making the wrong assumptions or is there some way to detect that
> the
> > > local store is not responsible for the key anymore?
> > >
> > > Best,
> > > Mikael
> > >
> > > On Thu, Sep 8, 2016 at 11:03 AM Damian Guy <da...@gmail.com>
> wrote:
> > >
> > > > Hi Mikael,
> > > >
> > > > A fix for KAFKA-4123 <
> https://issues.apache.org/jira/browse/KAFKA-4123
> > >
> > > > (the
> > > > issue you found with receiving null values) has now been committed to
> > > > trunk. I've tried it with your github repo and it appears to be
> > working.
> > > > You will have to make a small change to your code as we now throw
> > > > InvalidStateStoreException when the Stores are unavailable
> (previously
> > we
> > > > returned null).
> > > >
> > > > We added a test here
> > > > <
> > > >
> > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > test/java/org/apache/kafka/streams/integration/
> > QueryableStateIntegrationTest.java#L431
> > > > >
> > > > to
> > > > make sure we only get a value once the store has been
> (re-)initialized.
> > > > Please give it a go and thanks for your help in finding this issue.
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Mon, 5 Sep 2016 at 22:07 Mikael Högqvist <ho...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Damian,
> > > > >
> > > > > > > Failed to read key hello, org.mkhq.kafka.Topology$
> > StoreUnavailable
> > > > > > > > Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound
> > > > > > > > hello -> 10
> > > > > > >
> > > > > > >
> > > > > > The case where you get KeyNotFound looks like a bug to me. This
> > > > shouldn't
> > > > > > happen. I can see why it might happen and we will create a JIRA
> and
> > > fix
> > > > > it
> > > > > > right away.
> > > > > >
> > > > >
> > > > > Great, thanks for looking into this. I'll try again once the PR is
> > > > merged.
> > > > >
> > > > >
> > > > > >
> > > > > > I'm not sure how you end up with (hello -> 10). It could indicate
> > > that
> > > > > the
> > > > > > offsets for the topic you are consuming from weren't committed so
> > the
> > > > > data
> > > > > > gets processed again on the restart.
> > > > > >
> > > > >
> > > > > Yes, it didn't commit the offsets since streams.close() was not
> > called
> > > on
> > > > > ctrl-c. Fixed by adding a shutdown hook.
> > > > >
> > > > > Thanks,
> > > > > Mikael
> > > > >
> > > > >
> > > > > > Thanks,
> > > > > > Damian
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Queryable state client read guarantees

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Mikael,

Just adding to Damian's comment above, that the IllegalStateStoreException
here is thrown to indicate a "transient" state where the state store
hosting this key is being migrated and hence not available, where users
implementing the REST APIs on top of it, for example, can choose to handle
it differently. For example, either return a sentinel value as "key not
available" or return some error codes.

Guozhang


On Fri, Sep 9, 2016 at 9:40 AM, Damian Guy <da...@gmail.com> wrote:

> Hi Mikael,
>
> During rebalance both instances should throw IllegalStateStoreException
> until the rebalance has completed. Once the rebalance has completed if the
> key is not found on the local store, then you would get a null value. You
> can always find the Kafka Streams instance that will have that key
> (assuming it exists) by using:
>
> StreamsMetadata KafkaStreams.metadataForKey(String storeName, K key,
> Serializer<K> keySerializer)
>
> The StreamsMetadata will tell you which instance, via HostInfo, has the
> given key.
>
> HTH,
> Damian
>
>
>
>
> On Fri, 9 Sep 2016 at 16:56 Mikael Högqvist <ho...@gmail.com> wrote:
>
> > Hi Damian,
> >
> > thanks for fixing this so quickly, I re-ran the test and it works fine.
> >
> > The next test I tried was to read from two service instances implementing
> > the same string count topology. First, the client is started sending two
> > read requests, one per instance, every second. Next, I start the first
> > instance and let it complete the store init before the next instance is
> > started.
> >
> > Below is the initial part of the trace when going from 0 to 1 instance.
> The
> > trace log has the following columns: request id, instance, response code
> > and value.
> >
> > 3,localhost:2030,503,
> > 3,localhost:2031,503,
> > 4,localhost:2030,503,
> > 4,localhost:2031,503,
> > 5,localhost:2030,200,2
> > 5,localhost:2031,503,
> > 6,localhost:2030,200,2
> > 6,localhost:2031,503,
> >
> > Before the instance is started, both return 503, the status returned by
> the
> > client when it cannot connect to an instance. When the first instance has
> > started it returns the expected value 2 for request pair 5, 6 and so on.
> > The trace below is from when the second instance starts.
> >
> > 18,localhost:2030,200,2
> > 18,localhost:2031,503,
> > 19,localhost:2030,404,
> > 19,localhost:2031,503,
> > 20,localhost:2030,404,
> > 20,localhost:2031,503,
> > 21,localhost:2030,404,
> > 21,localhost:2031,200,2
> > 22,localhost:2030,404,
> > 22,localhost:2031,200,2
> >
> > The new instance takes over responsibility for the partition containing
> the
> > key "hello". During this period the new instance returns 503 as expected
> > until the store is ready. The issue is that the first instance that
> stored
> > the value starts returning 404 from request pair 19. A client doing
> > requests for this key would then have the following sequence:
> >
> > 18 -> 2
> > 19 -> Not found
> > 20 -> Not found
> > 21 -> 2
> >
> > From the client perspective, I think this violates the guarantee of
> always
> > reading the latest value.
> >
> > Am I making the wrong assumptions or is there some way to detect that the
> > local store is not responsible for the key anymore?
> >
> > Best,
> > Mikael
> >
> > On Thu, Sep 8, 2016 at 11:03 AM Damian Guy <da...@gmail.com> wrote:
> >
> > > Hi Mikael,
> > >
> > > A fix for KAFKA-4123 <https://issues.apache.org/jira/browse/KAFKA-4123
> >
> > > (the
> > > issue you found with receiving null values) has now been committed to
> > > trunk. I've tried it with your github repo and it appears to be
> working.
> > > You will have to make a small change to your code as we now throw
> > > InvalidStateStoreException when the Stores are unavailable (previously
> we
> > > returned null).
> > >
> > > We added a test here
> > > <
> > >
> > https://github.com/apache/kafka/blob/trunk/streams/src/
> test/java/org/apache/kafka/streams/integration/
> QueryableStateIntegrationTest.java#L431
> > > >
> > > to
> > > make sure we only get a value once the store has been (re-)initialized.
> > > Please give it a go and thanks for your help in finding this issue.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Mon, 5 Sep 2016 at 22:07 Mikael Högqvist <ho...@gmail.com>
> wrote:
> > >
> > > > Hi Damian,
> > > >
> > > > > > Failed to read key hello, org.mkhq.kafka.Topology$
> StoreUnavailable
> > > > > > > Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound
> > > > > > > hello -> 10
> > > > > >
> > > > > >
> > > > > The case where you get KeyNotFound looks like a bug to me. This
> > > shouldn't
> > > > > happen. I can see why it might happen and we will create a JIRA and
> > fix
> > > > it
> > > > > right away.
> > > > >
> > > >
> > > > Great, thanks for looking into this. I'll try again once the PR is
> > > merged.
> > > >
> > > >
> > > > >
> > > > > I'm not sure how you end up with (hello -> 10). It could indicate
> > that
> > > > the
> > > > > offsets for the topic you are consuming from weren't committed so
> the
> > > > data
> > > > > gets processed again on the restart.
> > > > >
> > > >
> > > > Yes, it didn't commit the offsets since streams.close() was not
> called
> > on
> > > > ctrl-c. Fixed by adding a shutdown hook.
> > > >
> > > > Thanks,
> > > > Mikael
> > > >
> > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > >
> > >
> >
>



-- 
-- Guozhang

Re: Queryable state client read guarantees

Posted by Damian Guy <da...@gmail.com>.
Hi Mikael,

During rebalance both instances should throw IllegalStateStoreException
until the rebalance has completed. Once the rebalance has completed if the
key is not found on the local store, then you would get a null value. You
can always find the Kafka Streams instance that will have that key
(assuming it exists) by using:

StreamsMetadata KafkaStreams.metadataForKey(String storeName, K key,
Serializer<K> keySerializer)

The StreamsMetadata will tell you which instance, via HostInfo, has the
given key.

HTH,
Damian




On Fri, 9 Sep 2016 at 16:56 Mikael Högqvist <ho...@gmail.com> wrote:

> Hi Damian,
>
> thanks for fixing this so quickly, I re-ran the test and it works fine.
>
> The next test I tried was to read from two service instances implementing
> the same string count topology. First, the client is started sending two
> read requests, one per instance, every second. Next, I start the first
> instance and let it complete the store init before the next instance is
> started.
>
> Below is the initial part of the trace when going from 0 to 1 instance. The
> trace log has the following columns: request id, instance, response code
> and value.
>
> 3,localhost:2030,503,
> 3,localhost:2031,503,
> 4,localhost:2030,503,
> 4,localhost:2031,503,
> 5,localhost:2030,200,2
> 5,localhost:2031,503,
> 6,localhost:2030,200,2
> 6,localhost:2031,503,
>
> Before the instance is started, both return 503, the status returned by the
> client when it cannot connect to an instance. When the first instance has
> started it returns the expected value 2 for request pair 5, 6 and so on.
> The trace below is from when the second instance starts.
>
> 18,localhost:2030,200,2
> 18,localhost:2031,503,
> 19,localhost:2030,404,
> 19,localhost:2031,503,
> 20,localhost:2030,404,
> 20,localhost:2031,503,
> 21,localhost:2030,404,
> 21,localhost:2031,200,2
> 22,localhost:2030,404,
> 22,localhost:2031,200,2
>
> The new instance takes over responsibility for the partition containing the
> key "hello". During this period the new instance returns 503 as expected
> until the store is ready. The issue is that the first instance that stored
> the value starts returning 404 from request pair 19. A client doing
> requests for this key would then have the following sequence:
>
> 18 -> 2
> 19 -> Not found
> 20 -> Not found
> 21 -> 2
>
> From the client perspective, I think this violates the guarantee of always
> reading the latest value.
>
> Am I making the wrong assumptions or is there some way to detect that the
> local store is not responsible for the key anymore?
>
> Best,
> Mikael
>
> On Thu, Sep 8, 2016 at 11:03 AM Damian Guy <da...@gmail.com> wrote:
>
> > Hi Mikael,
> >
> > A fix for KAFKA-4123 <https://issues.apache.org/jira/browse/KAFKA-4123>
> > (the
> > issue you found with receiving null values) has now been committed to
> > trunk. I've tried it with your github repo and it appears to be working.
> > You will have to make a small change to your code as we now throw
> > InvalidStateStoreException when the Stores are unavailable (previously we
> > returned null).
> >
> > We added a test here
> > <
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java#L431
> > >
> > to
> > make sure we only get a value once the store has been (re-)initialized.
> > Please give it a go and thanks for your help in finding this issue.
> >
> > Thanks,
> > Damian
> >
> > On Mon, 5 Sep 2016 at 22:07 Mikael Högqvist <ho...@gmail.com> wrote:
> >
> > > Hi Damian,
> > >
> > > > > Failed to read key hello, org.mkhq.kafka.Topology$StoreUnavailable
> > > > > > Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound
> > > > > > hello -> 10
> > > > >
> > > > >
> > > > The case where you get KeyNotFound looks like a bug to me. This
> > shouldn't
> > > > happen. I can see why it might happen and we will create a JIRA and
> fix
> > > it
> > > > right away.
> > > >
> > >
> > > Great, thanks for looking into this. I'll try again once the PR is
> > merged.
> > >
> > >
> > > >
> > > > I'm not sure how you end up with (hello -> 10). It could indicate
> that
> > > the
> > > > offsets for the topic you are consuming from weren't committed so the
> > > data
> > > > gets processed again on the restart.
> > > >
> > >
> > > Yes, it didn't commit the offsets since streams.close() was not called
> on
> > > ctrl-c. Fixed by adding a shutdown hook.
> > >
> > > Thanks,
> > > Mikael
> > >
> > >
> > > > Thanks,
> > > > Damian
> > > >
> > >
> >
>

Re: Queryable state client read guarantees

Posted by Mikael Högqvist <ho...@gmail.com>.
Hi Damian,

thanks for fixing this so quickly, I re-ran the test and it works fine.

The next test I tried was to read from two service instances implementing
the same string count topology. First, the client is started sending two
read requests, one per instance, every second. Next, I start the first
instance and let it complete the store init before the next instance is
started.

Below is the initial part of the trace when going from 0 to 1 instance. The
trace log has the following columns: request id, instance, response code
and value.

3,localhost:2030,503,
3,localhost:2031,503,
4,localhost:2030,503,
4,localhost:2031,503,
5,localhost:2030,200,2
5,localhost:2031,503,
6,localhost:2030,200,2
6,localhost:2031,503,

Before the instance is started, both return 503, the status returned by the
client when it cannot connect to an instance. When the first instance has
started it returns the expected value 2 for request pair 5, 6 and so on.
The trace below is from when the second instance starts.

18,localhost:2030,200,2
18,localhost:2031,503,
19,localhost:2030,404,
19,localhost:2031,503,
20,localhost:2030,404,
20,localhost:2031,503,
21,localhost:2030,404,
21,localhost:2031,200,2
22,localhost:2030,404,
22,localhost:2031,200,2

The new instance takes over responsibility for the partition containing the
key "hello". During this period the new instance returns 503 as expected
until the store is ready. The issue is that the first instance that stored
the value starts returning 404 from request pair 19. A client doing
requests for this key would then have the following sequence:

18 -> 2
19 -> Not found
20 -> Not found
21 -> 2

From the client perspective, I think this violates the guarantee of always
reading the latest value.

Am I making the wrong assumptions or is there some way to detect that the
local store is not responsible for the key anymore?

Best,
Mikael

On Thu, Sep 8, 2016 at 11:03 AM Damian Guy <da...@gmail.com> wrote:

> Hi Mikael,
>
> A fix for KAFKA-4123 <https://issues.apache.org/jira/browse/KAFKA-4123>
> (the
> issue you found with receiving null values) has now been committed to
> trunk. I've tried it with your github repo and it appears to be working.
> You will have to make a small change to your code as we now throw
> InvalidStateStoreException when the Stores are unavailable (previously we
> returned null).
>
> We added a test here
> <
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java#L431
> >
> to
> make sure we only get a value once the store has been (re-)initialized.
> Please give it a go and thanks for your help in finding this issue.
>
> Thanks,
> Damian
>
> On Mon, 5 Sep 2016 at 22:07 Mikael Högqvist <ho...@gmail.com> wrote:
>
> > Hi Damian,
> >
> > > > Failed to read key hello, org.mkhq.kafka.Topology$StoreUnavailable
> > > > > Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound
> > > > > hello -> 10
> > > >
> > > >
> > > The case where you get KeyNotFound looks like a bug to me. This
> shouldn't
> > > happen. I can see why it might happen and we will create a JIRA and fix
> > it
> > > right away.
> > >
> >
> > Great, thanks for looking into this. I'll try again once the PR is
> merged.
> >
> >
> > >
> > > I'm not sure how you end up with (hello -> 10). It could indicate that
> > the
> > > offsets for the topic you are consuming from weren't committed so the
> > data
> > > gets processed again on the restart.
> > >
> >
> > Yes, it didn't commit the offsets since streams.close() was not called on
> > ctrl-c. Fixed by adding a shutdown hook.
> >
> > Thanks,
> > Mikael
> >
> >
> > > Thanks,
> > > Damian
> > >
> >
>

Re: Queryable state client read guarantees

Posted by Damian Guy <da...@gmail.com>.
Hi Mikael,

A fix for KAFKA-4123 <https://issues.apache.org/jira/browse/KAFKA-4123> (the
issue you found with receiving null values) has now been committed to
trunk. I've tried it with your github repo and it appears to be working.
You will have to make a small change to your code as we now throw
InvalidStateStoreException when the Stores are unavailable (previously we
returned null).

We added a test here
<https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java#L431>
to
make sure we only get a value once the store has been (re-)initialized.
Please give it a go and thanks for your help in finding this issue.

Thanks,
Damian

On Mon, 5 Sep 2016 at 22:07 Mikael Högqvist <ho...@gmail.com> wrote:

> Hi Damian,
>
> > > Failed to read key hello, org.mkhq.kafka.Topology$StoreUnavailable
> > > > Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound
> > > > hello -> 10
> > >
> > >
> > The case where you get KeyNotFound looks like a bug to me. This shouldn't
> > happen. I can see why it might happen and we will create a JIRA and fix
> it
> > right away.
> >
>
> Great, thanks for looking into this. I'll try again once the PR is merged.
>
>
> >
> > I'm not sure how you end up with (hello -> 10). It could indicate that
> the
> > offsets for the topic you are consuming from weren't committed so the
> data
> > gets processed again on the restart.
> >
>
> Yes, it didn't commit the offsets since streams.close() was not called on
> ctrl-c. Fixed by adding a shutdown hook.
>
> Thanks,
> Mikael
>
>
> > Thanks,
> > Damian
> >
>

Re: Queryable state client read guarantees

Posted by Mikael Högqvist <ho...@gmail.com>.
Hi Damian,

> > Failed to read key hello, org.mkhq.kafka.Topology$StoreUnavailable
> > > Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound
> > > hello -> 10
> >
> >
> The case where you get KeyNotFound looks like a bug to me. This shouldn't
> happen. I can see why it might happen and we will create a JIRA and fix it
> right away.
>

Great, thanks for looking into this. I'll try again once the PR is merged.


>
> I'm not sure how you end up with (hello -> 10). It could indicate that the
> offsets for the topic you are consuming from weren't committed so the data
> gets processed again on the restart.
>

Yes, it didn't commit the offsets since streams.close() was not called on
ctrl-c. Fixed by adding a shutdown hook.

Thanks,
Mikael


> Thanks,
> Damian
>

Re: Queryable state client read guarantees

Posted by Damian Guy <da...@gmail.com>.
Hi Mikael,


> > Failed to read key hello, org.mkhq.kafka.Topology$StoreUnavailable
> > Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound
> > hello -> 10
>
>
The case where you get KeyNotFound looks like a bug to me. This shouldn't
happen. I can see why it might happen and we will create a JIRA and fix it
right away.

I'm not sure how you end up with (hello -> 10). It could indicate that the
offsets for the topic you are consuming from weren't committed so the data
gets processed again on the restart.

Thanks,
Damian