You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jon Yeargers <jo...@cedexis.com> on 2017/03/28 21:37:05 UTC

Understanding ReadOnlyWindowStore.fetch

Im probing about trying to find a way to solve my aggregation -> db issue.
Looking at the '.fetch()'  function Im wondering about the 'timeFrom' and
'timeTo' params as not a lot is mentioned about 'proper' usage.

The test in
https://github.com/confluentinc/examples/blob/master/kafka-streams/src/test/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExampleTest.java#L200-L212
makes it appear that the params are boundaries and that it will return an
inclusive list of every key/window combination. Truth?

My tests to this end haven't returned anything.

Im watching the values coming out of the KTable<Window, String> so I can
send them back as request params. What Ive tried:

- Window.key(), Window.key().start() and Window.key().end()
- Window.key(), (Window.key().start() - 1) and (Window.key().end() + 1)
- Window.key(), 0 and Window.key().end()
- Window.key(), 0 and (Window.key().end() + 1)

None of these seem to hit anything in the StateStore.

Is there a delay before Store values become available for '.fetch()'?

Re: Understanding ReadOnlyWindowStore.fetch

Posted by Michael Noll <mi...@confluent.io>.
Jon,

perhaps you could share the full integration test (or whatever code you're
using to experiment)?  We had a similar "how does X work?" question on
StackOverflow recently [1], and it was much easier to help once we e.g.
also understood how the test data was exactly being generated.

-Michael




[1]
https://stackoverflow.com/questions/43038653/how-to-actually-discard-late-records


On Thu, Mar 30, 2017 at 1:53 AM, Jon Yeargers <jo...@cedexis.com>
wrote:

> I remain more than mystified about the workings of the StateStore. I tried
> making aggregations with a 1minute window, 10 second advance and a _12
> hour_ retention (which is longer than the retention.ms of the topic).  I
> still couldn't get more than a 15% hit rate on the StateStore.
>
> Are there configuration settings? Some properties file to setup RocksDB? Im
> not getting any errors - just not getting any data.
>
> On Wed, Mar 29, 2017 at 12:52 PM, Jon Yeargers <jo...@cedexis.com>
> wrote:
>
> > So '.until()' is based on clock time / elapsed time (IE record age) /
> > something else?
> >
> > The fact that Im seeing lots of records come through that can't be found
> > in the Store - these are 'old' and already expired?
> >
> > Going forward - it would be useful to have different forms of '.until()'
> > so one could consume old records (EG if one was catching up from lag)
> > without having to worry about them immediately disappearing.
> >
> > On Wed, Mar 29, 2017 at 10:37 AM, Damian Guy <da...@gmail.com>
> wrote:
> >
> >> Jon,
> >>
> >> You should be able to query anything that has not expired, i.e., based
> on
> >> TimeWindows.until(..).
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Wed, 29 Mar 2017 at 17:24 Jon Yeargers <jo...@cedexis.com>
> >> wrote:
> >>
> >> > To be a bit more specific:
> >> >
> >> > If I call this:         KTable<Window, String> kt =
> >> > sourceStream.groupByKey().reduce(..., "somekeystore");
> >> >
> >> > and then call this:
> >> >
> >> > kt.forEach()-> ...
> >> >
> >> > Can I assume that everything that comes out will be available in
> >> > "somekeystore"? If not, what subset should I expect to find there?
> >> >
> >> > On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers <
> jon.yeargers@cedexis.com
> >> >
> >> > wrote:
> >> >
> >> > > But if a key shows up in a KTable->forEach should it be available in
> >> the
> >> > > StateStore (from the KTable)?
> >> > >
> >> > > On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll <michael@confluent.io
> >
> >> > > wrote:
> >> > >
> >> > >> Jon,
> >> > >>
> >> > >> there's a related example, using a window store and a key-value
> >> store,
> >> > at
> >> > >> https://github.com/confluentinc/examples/blob/3.2.x/kafka-
> >> > >> streams/src/test/java/io/confluent/examples/streams/Val
> >> > >> idateStateWithInteractiveQueriesLambdaIntegrationTest.java
> >> > >> (this is for Confluent 3.2 / Kafka 0.10.2).
> >> > >>
> >> > >> -Michael
> >> > >>
> >> > >>
> >> > >>
> >> > >> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers <
> >> jon.yeargers@cedexis.com
> >> > >
> >> > >> wrote:
> >> > >>
> >> > >> > Im only running one instance (locally) to keep things simple.
> >> > >> >
> >> > >> > Reduction:
> >> > >> >
> >> > >> >         KTable<Windowed<String>, String> hourAggStore =
> >> > >> > sourceStream.groupByKey().reduce(rowReducer,
> >> > >> >                 TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60
> *
> >> > >> > 1000).until(70 * 60 * 1000L),
> >> > >> >                 "HourAggStore");
> >> > >> >
> >> > >> > then I get values to look for via:
> >> > >> >
> >> > >> >         hourAggStore.foreach((k, v) -> {
> >> > >> >                 LogLine logLine = objectMapper.readValue(v,
> >> > >> logLine.class);
> >> > >> >                 LOGGER.debug("{}", k.key());
> >> > >> >         });
> >> > >> >
> >> > >> > Ive kept it easy by requesting everything from 0 to
> >> > >> > 'System.currentTimeMillis()'. Retrieval is done using a snip from
> >> your
> >> > >> > sample code "windowedByKey".
> >> > >> >
> >> > >> > Requests are sent in via curl and output through the same
> channel.
> >> I
> >> > >> pass
> >> > >> > in the key and ask for any values.
> >> > >> >
> >> > >> > Ive looked at the values passed in / out of the reduction
> function
> >> and
> >> > >> they
> >> > >> > look sane.
> >> > >> >
> >> > >> > My assumption is that if a value shows up in the 'forEach' loop
> >> this
> >> > >> > implies it exists in the StateStore. Accurate?
> >> > >> >
> >> > >> > In fact, only about one in 10 requests actually return any
> values.
> >> No
> >> > >> > errors - just no data.
> >> > >> >
> >> > >> >
> >> > >> >
> >> > >> > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy <
> damian.guy@gmail.com>
> >> > >> wrote:
> >> > >> >
> >> > >> > > Hi Jon,
> >> > >> > >
> >> > >> > > If you are able to get a handle on the store, i.e., via
> >> > >> > > KafkaStreams.store(...) and call fetch without any exceptions,
> >> then
> >> > >> the
> >> > >> > > store is available.
> >> > >> > > The time params to fetch are the boundaries to search for
> windows
> >> > for
> >> > >> the
> >> > >> > > given key. They relate to the start time of the window, so if
> you
> >> > did
> >> > >> > > fetch(key, t1, t2) - it will find all the windows for key that
> >> start
> >> > >> in
> >> > >> > the
> >> > >> > > inclusive time range t1 - t2.
> >> > >> > >
> >> > >> > > Are you running more than one instance? If yes, then you want
> to
> >> > make
> >> > >> > sure
> >> > >> > > that you are querying the correct instance. For that you can
> use:
> >> > >> > > KafkaStreams.metadataForKey(...) to find the instance that has
> >> the
> >> > >> key
> >> > >> > you
> >> > >> > > are looking for.
> >> > >> > >
> >> > >> > > Thanks,
> >> > >> > > Damian
> >> > >> > >
> >> > >> > >
> >> > >> > >
> >> > >> > > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers <
> >> jon.yeargers@cedexis.com
> >> > >
> >> > >> > > wrote:
> >> > >> > >
> >> > >> > > > Im probing about trying to find a way to solve my aggregation
> >> ->
> >> > db
> >> > >> > > issue.
> >> > >> > > > Looking at the '.fetch()'  function Im wondering about the
> >> > >> 'timeFrom'
> >> > >> > and
> >> > >> > > > 'timeTo' params as not a lot is mentioned about 'proper'
> usage.
> >> > >> > > >
> >> > >> > > > The test in
> >> > >> > > >
> >> > >> > > > https://github.com/confluentinc/examples/blob/
> >> > >> > > master/kafka-streams/src/test/java/io/confluent/examples/
> >> > >> > > streams/interactivequeries/WordCountInteractiveQueriesExa
> >> > >> > > mpleTest.java#L200-L212
> >> > >> > > > makes it appear that the params are boundaries and that it
> will
> >> > >> return
> >> > >> > an
> >> > >> > > > inclusive list of every key/window combination. Truth?
> >> > >> > > >
> >> > >> > > > My tests to this end haven't returned anything.
> >> > >> > > >
> >> > >> > > > Im watching the values coming out of the KTable<Window,
> String>
> >> > so I
> >> > >> > can
> >> > >> > > > send them back as request params. What Ive tried:
> >> > >> > > >
> >> > >> > > > - Window.key(), Window.key().start() and Window.key().end()
> >> > >> > > > - Window.key(), (Window.key().start() - 1) and
> >> (Window.key().end()
> >> > >> + 1)
> >> > >> > > > - Window.key(), 0 and Window.key().end()
> >> > >> > > > - Window.key(), 0 and (Window.key().end() + 1)
> >> > >> > > >
> >> > >> > > > None of these seem to hit anything in the StateStore.
> >> > >> > > >
> >> > >> > > > Is there a delay before Store values become available for
> >> > >> '.fetch()'?
> >> > >> > > >
> >> > >> > >
> >> > >> >
> >> > >>
> >> > >
> >> > >
> >> >
> >>
> >
> >
>

Re: Understanding ReadOnlyWindowStore.fetch

Posted by Jon Yeargers <jo...@cedexis.com>.
I remain more than mystified about the workings of the StateStore. I tried
making aggregations with a 1minute window, 10 second advance and a _12
hour_ retention (which is longer than the retention.ms of the topic).  I
still couldn't get more than a 15% hit rate on the StateStore.

Are there configuration settings? Some properties file to setup RocksDB? Im
not getting any errors - just not getting any data.

On Wed, Mar 29, 2017 at 12:52 PM, Jon Yeargers <jo...@cedexis.com>
wrote:

> So '.until()' is based on clock time / elapsed time (IE record age) /
> something else?
>
> The fact that Im seeing lots of records come through that can't be found
> in the Store - these are 'old' and already expired?
>
> Going forward - it would be useful to have different forms of '.until()'
> so one could consume old records (EG if one was catching up from lag)
> without having to worry about them immediately disappearing.
>
> On Wed, Mar 29, 2017 at 10:37 AM, Damian Guy <da...@gmail.com> wrote:
>
>> Jon,
>>
>> You should be able to query anything that has not expired, i.e., based on
>> TimeWindows.until(..).
>>
>> Thanks,
>> Damian
>>
>> On Wed, 29 Mar 2017 at 17:24 Jon Yeargers <jo...@cedexis.com>
>> wrote:
>>
>> > To be a bit more specific:
>> >
>> > If I call this:         KTable<Window, String> kt =
>> > sourceStream.groupByKey().reduce(..., "somekeystore");
>> >
>> > and then call this:
>> >
>> > kt.forEach()-> ...
>> >
>> > Can I assume that everything that comes out will be available in
>> > "somekeystore"? If not, what subset should I expect to find there?
>> >
>> > On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers <jon.yeargers@cedexis.com
>> >
>> > wrote:
>> >
>> > > But if a key shows up in a KTable->forEach should it be available in
>> the
>> > > StateStore (from the KTable)?
>> > >
>> > > On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll <mi...@confluent.io>
>> > > wrote:
>> > >
>> > >> Jon,
>> > >>
>> > >> there's a related example, using a window store and a key-value
>> store,
>> > at
>> > >> https://github.com/confluentinc/examples/blob/3.2.x/kafka-
>> > >> streams/src/test/java/io/confluent/examples/streams/Val
>> > >> idateStateWithInteractiveQueriesLambdaIntegrationTest.java
>> > >> (this is for Confluent 3.2 / Kafka 0.10.2).
>> > >>
>> > >> -Michael
>> > >>
>> > >>
>> > >>
>> > >> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers <
>> jon.yeargers@cedexis.com
>> > >
>> > >> wrote:
>> > >>
>> > >> > Im only running one instance (locally) to keep things simple.
>> > >> >
>> > >> > Reduction:
>> > >> >
>> > >> >         KTable<Windowed<String>, String> hourAggStore =
>> > >> > sourceStream.groupByKey().reduce(rowReducer,
>> > >> >                 TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
>> > >> > 1000).until(70 * 60 * 1000L),
>> > >> >                 "HourAggStore");
>> > >> >
>> > >> > then I get values to look for via:
>> > >> >
>> > >> >         hourAggStore.foreach((k, v) -> {
>> > >> >                 LogLine logLine = objectMapper.readValue(v,
>> > >> logLine.class);
>> > >> >                 LOGGER.debug("{}", k.key());
>> > >> >         });
>> > >> >
>> > >> > Ive kept it easy by requesting everything from 0 to
>> > >> > 'System.currentTimeMillis()'. Retrieval is done using a snip from
>> your
>> > >> > sample code "windowedByKey".
>> > >> >
>> > >> > Requests are sent in via curl and output through the same channel.
>> I
>> > >> pass
>> > >> > in the key and ask for any values.
>> > >> >
>> > >> > Ive looked at the values passed in / out of the reduction function
>> and
>> > >> they
>> > >> > look sane.
>> > >> >
>> > >> > My assumption is that if a value shows up in the 'forEach' loop
>> this
>> > >> > implies it exists in the StateStore. Accurate?
>> > >> >
>> > >> > In fact, only about one in 10 requests actually return any values.
>> No
>> > >> > errors - just no data.
>> > >> >
>> > >> >
>> > >> >
>> > >> > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy <da...@gmail.com>
>> > >> wrote:
>> > >> >
>> > >> > > Hi Jon,
>> > >> > >
>> > >> > > If you are able to get a handle on the store, i.e., via
>> > >> > > KafkaStreams.store(...) and call fetch without any exceptions,
>> then
>> > >> the
>> > >> > > store is available.
>> > >> > > The time params to fetch are the boundaries to search for windows
>> > for
>> > >> the
>> > >> > > given key. They relate to the start time of the window, so if you
>> > did
>> > >> > > fetch(key, t1, t2) - it will find all the windows for key that
>> start
>> > >> in
>> > >> > the
>> > >> > > inclusive time range t1 - t2.
>> > >> > >
>> > >> > > Are you running more than one instance? If yes, then you want to
>> > make
>> > >> > sure
>> > >> > > that you are querying the correct instance. For that you can use:
>> > >> > > KafkaStreams.metadataForKey(...) to find the instance that has
>> the
>> > >> key
>> > >> > you
>> > >> > > are looking for.
>> > >> > >
>> > >> > > Thanks,
>> > >> > > Damian
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> > > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers <
>> jon.yeargers@cedexis.com
>> > >
>> > >> > > wrote:
>> > >> > >
>> > >> > > > Im probing about trying to find a way to solve my aggregation
>> ->
>> > db
>> > >> > > issue.
>> > >> > > > Looking at the '.fetch()'  function Im wondering about the
>> > >> 'timeFrom'
>> > >> > and
>> > >> > > > 'timeTo' params as not a lot is mentioned about 'proper' usage.
>> > >> > > >
>> > >> > > > The test in
>> > >> > > >
>> > >> > > > https://github.com/confluentinc/examples/blob/
>> > >> > > master/kafka-streams/src/test/java/io/confluent/examples/
>> > >> > > streams/interactivequeries/WordCountInteractiveQueriesExa
>> > >> > > mpleTest.java#L200-L212
>> > >> > > > makes it appear that the params are boundaries and that it will
>> > >> return
>> > >> > an
>> > >> > > > inclusive list of every key/window combination. Truth?
>> > >> > > >
>> > >> > > > My tests to this end haven't returned anything.
>> > >> > > >
>> > >> > > > Im watching the values coming out of the KTable<Window, String>
>> > so I
>> > >> > can
>> > >> > > > send them back as request params. What Ive tried:
>> > >> > > >
>> > >> > > > - Window.key(), Window.key().start() and Window.key().end()
>> > >> > > > - Window.key(), (Window.key().start() - 1) and
>> (Window.key().end()
>> > >> + 1)
>> > >> > > > - Window.key(), 0 and Window.key().end()
>> > >> > > > - Window.key(), 0 and (Window.key().end() + 1)
>> > >> > > >
>> > >> > > > None of these seem to hit anything in the StateStore.
>> > >> > > >
>> > >> > > > Is there a delay before Store values become available for
>> > >> '.fetch()'?
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>
>
>

Re: Understanding ReadOnlyWindowStore.fetch

Posted by "Matthias J. Sax" <ma...@confluent.io>.
It's based in "stream time", ie, the internally tracked progress based
on the timestamps return by TimestampExtractor.

-Matthias

On 3/29/17 12:52 PM, Jon Yeargers wrote:
> So '.until()' is based on clock time / elapsed time (IE record age) /
> something else?
> 
> The fact that Im seeing lots of records come through that can't be found in
> the Store - these are 'old' and already expired?
> 
> Going forward - it would be useful to have different forms of '.until()' so
> one could consume old records (EG if one was catching up from lag) without
> having to worry about them immediately disappearing.
> 
> On Wed, Mar 29, 2017 at 10:37 AM, Damian Guy <da...@gmail.com> wrote:
> 
>> Jon,
>>
>> You should be able to query anything that has not expired, i.e., based on
>> TimeWindows.until(..).
>>
>> Thanks,
>> Damian
>>
>> On Wed, 29 Mar 2017 at 17:24 Jon Yeargers <jo...@cedexis.com>
>> wrote:
>>
>>> To be a bit more specific:
>>>
>>> If I call this:         KTable<Window, String> kt =
>>> sourceStream.groupByKey().reduce(..., "somekeystore");
>>>
>>> and then call this:
>>>
>>> kt.forEach()-> ...
>>>
>>> Can I assume that everything that comes out will be available in
>>> "somekeystore"? If not, what subset should I expect to find there?
>>>
>>> On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers <jo...@cedexis.com>
>>> wrote:
>>>
>>>> But if a key shows up in a KTable->forEach should it be available in
>> the
>>>> StateStore (from the KTable)?
>>>>
>>>> On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll <mi...@confluent.io>
>>>> wrote:
>>>>
>>>>> Jon,
>>>>>
>>>>> there's a related example, using a window store and a key-value store,
>>> at
>>>>> https://github.com/confluentinc/examples/blob/3.2.x/kafka-
>>>>> streams/src/test/java/io/confluent/examples/streams/Val
>>>>> idateStateWithInteractiveQueriesLambdaIntegrationTest.java
>>>>> (this is for Confluent 3.2 / Kafka 0.10.2).
>>>>>
>>>>> -Michael
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers <
>> jon.yeargers@cedexis.com
>>>>
>>>>> wrote:
>>>>>
>>>>>> Im only running one instance (locally) to keep things simple.
>>>>>>
>>>>>> Reduction:
>>>>>>
>>>>>>         KTable<Windowed<String>, String> hourAggStore =
>>>>>> sourceStream.groupByKey().reduce(rowReducer,
>>>>>>                 TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
>>>>>> 1000).until(70 * 60 * 1000L),
>>>>>>                 "HourAggStore");
>>>>>>
>>>>>> then I get values to look for via:
>>>>>>
>>>>>>         hourAggStore.foreach((k, v) -> {
>>>>>>                 LogLine logLine = objectMapper.readValue(v,
>>>>> logLine.class);
>>>>>>                 LOGGER.debug("{}", k.key());
>>>>>>         });
>>>>>>
>>>>>> Ive kept it easy by requesting everything from 0 to
>>>>>> 'System.currentTimeMillis()'. Retrieval is done using a snip from
>> your
>>>>>> sample code "windowedByKey".
>>>>>>
>>>>>> Requests are sent in via curl and output through the same channel. I
>>>>> pass
>>>>>> in the key and ask for any values.
>>>>>>
>>>>>> Ive looked at the values passed in / out of the reduction function
>> and
>>>>> they
>>>>>> look sane.
>>>>>>
>>>>>> My assumption is that if a value shows up in the 'forEach' loop this
>>>>>> implies it exists in the StateStore. Accurate?
>>>>>>
>>>>>> In fact, only about one in 10 requests actually return any values.
>> No
>>>>>> errors - just no data.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy <da...@gmail.com>
>>>>> wrote:
>>>>>>
>>>>>>> Hi Jon,
>>>>>>>
>>>>>>> If you are able to get a handle on the store, i.e., via
>>>>>>> KafkaStreams.store(...) and call fetch without any exceptions,
>> then
>>>>> the
>>>>>>> store is available.
>>>>>>> The time params to fetch are the boundaries to search for windows
>>> for
>>>>> the
>>>>>>> given key. They relate to the start time of the window, so if you
>>> did
>>>>>>> fetch(key, t1, t2) - it will find all the windows for key that
>> start
>>>>> in
>>>>>> the
>>>>>>> inclusive time range t1 - t2.
>>>>>>>
>>>>>>> Are you running more than one instance? If yes, then you want to
>>> make
>>>>>> sure
>>>>>>> that you are querying the correct instance. For that you can use:
>>>>>>> KafkaStreams.metadataForKey(...) to find the instance that has
>> the
>>>>> key
>>>>>> you
>>>>>>> are looking for.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Damian
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, 28 Mar 2017 at 22:37 Jon Yeargers <
>> jon.yeargers@cedexis.com
>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Im probing about trying to find a way to solve my aggregation ->
>>> db
>>>>>>> issue.
>>>>>>>> Looking at the '.fetch()'  function Im wondering about the
>>>>> 'timeFrom'
>>>>>> and
>>>>>>>> 'timeTo' params as not a lot is mentioned about 'proper' usage.
>>>>>>>>
>>>>>>>> The test in
>>>>>>>>
>>>>>>>> https://github.com/confluentinc/examples/blob/
>>>>>>> master/kafka-streams/src/test/java/io/confluent/examples/
>>>>>>> streams/interactivequeries/WordCountInteractiveQueriesExa
>>>>>>> mpleTest.java#L200-L212
>>>>>>>> makes it appear that the params are boundaries and that it will
>>>>> return
>>>>>> an
>>>>>>>> inclusive list of every key/window combination. Truth?
>>>>>>>>
>>>>>>>> My tests to this end haven't returned anything.
>>>>>>>>
>>>>>>>> Im watching the values coming out of the KTable<Window, String>
>>> so I
>>>>>> can
>>>>>>>> send them back as request params. What Ive tried:
>>>>>>>>
>>>>>>>> - Window.key(), Window.key().start() and Window.key().end()
>>>>>>>> - Window.key(), (Window.key().start() - 1) and
>> (Window.key().end()
>>>>> + 1)
>>>>>>>> - Window.key(), 0 and Window.key().end()
>>>>>>>> - Window.key(), 0 and (Window.key().end() + 1)
>>>>>>>>
>>>>>>>> None of these seem to hit anything in the StateStore.
>>>>>>>>
>>>>>>>> Is there a delay before Store values become available for
>>>>> '.fetch()'?
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
> 


Re: Understanding ReadOnlyWindowStore.fetch

Posted by Jon Yeargers <jo...@cedexis.com>.
So '.until()' is based on clock time / elapsed time (IE record age) /
something else?

The fact that Im seeing lots of records come through that can't be found in
the Store - these are 'old' and already expired?

Going forward - it would be useful to have different forms of '.until()' so
one could consume old records (EG if one was catching up from lag) without
having to worry about them immediately disappearing.

On Wed, Mar 29, 2017 at 10:37 AM, Damian Guy <da...@gmail.com> wrote:

> Jon,
>
> You should be able to query anything that has not expired, i.e., based on
> TimeWindows.until(..).
>
> Thanks,
> Damian
>
> On Wed, 29 Mar 2017 at 17:24 Jon Yeargers <jo...@cedexis.com>
> wrote:
>
> > To be a bit more specific:
> >
> > If I call this:         KTable<Window, String> kt =
> > sourceStream.groupByKey().reduce(..., "somekeystore");
> >
> > and then call this:
> >
> > kt.forEach()-> ...
> >
> > Can I assume that everything that comes out will be available in
> > "somekeystore"? If not, what subset should I expect to find there?
> >
> > On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers <jo...@cedexis.com>
> > wrote:
> >
> > > But if a key shows up in a KTable->forEach should it be available in
> the
> > > StateStore (from the KTable)?
> > >
> > > On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll <mi...@confluent.io>
> > > wrote:
> > >
> > >> Jon,
> > >>
> > >> there's a related example, using a window store and a key-value store,
> > at
> > >> https://github.com/confluentinc/examples/blob/3.2.x/kafka-
> > >> streams/src/test/java/io/confluent/examples/streams/Val
> > >> idateStateWithInteractiveQueriesLambdaIntegrationTest.java
> > >> (this is for Confluent 3.2 / Kafka 0.10.2).
> > >>
> > >> -Michael
> > >>
> > >>
> > >>
> > >> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers <
> jon.yeargers@cedexis.com
> > >
> > >> wrote:
> > >>
> > >> > Im only running one instance (locally) to keep things simple.
> > >> >
> > >> > Reduction:
> > >> >
> > >> >         KTable<Windowed<String>, String> hourAggStore =
> > >> > sourceStream.groupByKey().reduce(rowReducer,
> > >> >                 TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
> > >> > 1000).until(70 * 60 * 1000L),
> > >> >                 "HourAggStore");
> > >> >
> > >> > then I get values to look for via:
> > >> >
> > >> >         hourAggStore.foreach((k, v) -> {
> > >> >                 LogLine logLine = objectMapper.readValue(v,
> > >> logLine.class);
> > >> >                 LOGGER.debug("{}", k.key());
> > >> >         });
> > >> >
> > >> > Ive kept it easy by requesting everything from 0 to
> > >> > 'System.currentTimeMillis()'. Retrieval is done using a snip from
> your
> > >> > sample code "windowedByKey".
> > >> >
> > >> > Requests are sent in via curl and output through the same channel. I
> > >> pass
> > >> > in the key and ask for any values.
> > >> >
> > >> > Ive looked at the values passed in / out of the reduction function
> and
> > >> they
> > >> > look sane.
> > >> >
> > >> > My assumption is that if a value shows up in the 'forEach' loop this
> > >> > implies it exists in the StateStore. Accurate?
> > >> >
> > >> > In fact, only about one in 10 requests actually return any values.
> No
> > >> > errors - just no data.
> > >> >
> > >> >
> > >> >
> > >> > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy <da...@gmail.com>
> > >> wrote:
> > >> >
> > >> > > Hi Jon,
> > >> > >
> > >> > > If you are able to get a handle on the store, i.e., via
> > >> > > KafkaStreams.store(...) and call fetch without any exceptions,
> then
> > >> the
> > >> > > store is available.
> > >> > > The time params to fetch are the boundaries to search for windows
> > for
> > >> the
> > >> > > given key. They relate to the start time of the window, so if you
> > did
> > >> > > fetch(key, t1, t2) - it will find all the windows for key that
> start
> > >> in
> > >> > the
> > >> > > inclusive time range t1 - t2.
> > >> > >
> > >> > > Are you running more than one instance? If yes, then you want to
> > make
> > >> > sure
> > >> > > that you are querying the correct instance. For that you can use:
> > >> > > KafkaStreams.metadataForKey(...) to find the instance that has
> the
> > >> key
> > >> > you
> > >> > > are looking for.
> > >> > >
> > >> > > Thanks,
> > >> > > Damian
> > >> > >
> > >> > >
> > >> > >
> > >> > > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers <
> jon.yeargers@cedexis.com
> > >
> > >> > > wrote:
> > >> > >
> > >> > > > Im probing about trying to find a way to solve my aggregation ->
> > db
> > >> > > issue.
> > >> > > > Looking at the '.fetch()'  function Im wondering about the
> > >> 'timeFrom'
> > >> > and
> > >> > > > 'timeTo' params as not a lot is mentioned about 'proper' usage.
> > >> > > >
> > >> > > > The test in
> > >> > > >
> > >> > > > https://github.com/confluentinc/examples/blob/
> > >> > > master/kafka-streams/src/test/java/io/confluent/examples/
> > >> > > streams/interactivequeries/WordCountInteractiveQueriesExa
> > >> > > mpleTest.java#L200-L212
> > >> > > > makes it appear that the params are boundaries and that it will
> > >> return
> > >> > an
> > >> > > > inclusive list of every key/window combination. Truth?
> > >> > > >
> > >> > > > My tests to this end haven't returned anything.
> > >> > > >
> > >> > > > Im watching the values coming out of the KTable<Window, String>
> > so I
> > >> > can
> > >> > > > send them back as request params. What Ive tried:
> > >> > > >
> > >> > > > - Window.key(), Window.key().start() and Window.key().end()
> > >> > > > - Window.key(), (Window.key().start() - 1) and
> (Window.key().end()
> > >> + 1)
> > >> > > > - Window.key(), 0 and Window.key().end()
> > >> > > > - Window.key(), 0 and (Window.key().end() + 1)
> > >> > > >
> > >> > > > None of these seem to hit anything in the StateStore.
> > >> > > >
> > >> > > > Is there a delay before Store values become available for
> > >> '.fetch()'?
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Re: Understanding ReadOnlyWindowStore.fetch

Posted by Damian Guy <da...@gmail.com>.
Jon,

You should be able to query anything that has not expired, i.e., based on
TimeWindows.until(..).

Thanks,
Damian

On Wed, 29 Mar 2017 at 17:24 Jon Yeargers <jo...@cedexis.com> wrote:

> To be a bit more specific:
>
> If I call this:         KTable<Window, String> kt =
> sourceStream.groupByKey().reduce(..., "somekeystore");
>
> and then call this:
>
> kt.forEach()-> ...
>
> Can I assume that everything that comes out will be available in
> "somekeystore"? If not, what subset should I expect to find there?
>
> On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers <jo...@cedexis.com>
> wrote:
>
> > But if a key shows up in a KTable->forEach should it be available in the
> > StateStore (from the KTable)?
> >
> > On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll <mi...@confluent.io>
> > wrote:
> >
> >> Jon,
> >>
> >> there's a related example, using a window store and a key-value store,
> at
> >> https://github.com/confluentinc/examples/blob/3.2.x/kafka-
> >> streams/src/test/java/io/confluent/examples/streams/Val
> >> idateStateWithInteractiveQueriesLambdaIntegrationTest.java
> >> (this is for Confluent 3.2 / Kafka 0.10.2).
> >>
> >> -Michael
> >>
> >>
> >>
> >> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers <jon.yeargers@cedexis.com
> >
> >> wrote:
> >>
> >> > Im only running one instance (locally) to keep things simple.
> >> >
> >> > Reduction:
> >> >
> >> >         KTable<Windowed<String>, String> hourAggStore =
> >> > sourceStream.groupByKey().reduce(rowReducer,
> >> >                 TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
> >> > 1000).until(70 * 60 * 1000L),
> >> >                 "HourAggStore");
> >> >
> >> > then I get values to look for via:
> >> >
> >> >         hourAggStore.foreach((k, v) -> {
> >> >                 LogLine logLine = objectMapper.readValue(v,
> >> logLine.class);
> >> >                 LOGGER.debug("{}", k.key());
> >> >         });
> >> >
> >> > Ive kept it easy by requesting everything from 0 to
> >> > 'System.currentTimeMillis()'. Retrieval is done using a snip from your
> >> > sample code "windowedByKey".
> >> >
> >> > Requests are sent in via curl and output through the same channel. I
> >> pass
> >> > in the key and ask for any values.
> >> >
> >> > Ive looked at the values passed in / out of the reduction function and
> >> they
> >> > look sane.
> >> >
> >> > My assumption is that if a value shows up in the 'forEach' loop this
> >> > implies it exists in the StateStore. Accurate?
> >> >
> >> > In fact, only about one in 10 requests actually return any values. No
> >> > errors - just no data.
> >> >
> >> >
> >> >
> >> > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy <da...@gmail.com>
> >> wrote:
> >> >
> >> > > Hi Jon,
> >> > >
> >> > > If you are able to get a handle on the store, i.e., via
> >> > > KafkaStreams.store(...) and call fetch without any exceptions, then
> >> the
> >> > > store is available.
> >> > > The time params to fetch are the boundaries to search for windows
> for
> >> the
> >> > > given key. They relate to the start time of the window, so if you
> did
> >> > > fetch(key, t1, t2) - it will find all the windows for key that start
> >> in
> >> > the
> >> > > inclusive time range t1 - t2.
> >> > >
> >> > > Are you running more than one instance? If yes, then you want to
> make
> >> > sure
> >> > > that you are querying the correct instance. For that you can use:
> >> > > KafkaStreams.metadataForKey(...) to find the instance that has the
> >> key
> >> > you
> >> > > are looking for.
> >> > >
> >> > > Thanks,
> >> > > Damian
> >> > >
> >> > >
> >> > >
> >> > > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers <jon.yeargers@cedexis.com
> >
> >> > > wrote:
> >> > >
> >> > > > Im probing about trying to find a way to solve my aggregation ->
> db
> >> > > issue.
> >> > > > Looking at the '.fetch()'  function Im wondering about the
> >> 'timeFrom'
> >> > and
> >> > > > 'timeTo' params as not a lot is mentioned about 'proper' usage.
> >> > > >
> >> > > > The test in
> >> > > >
> >> > > > https://github.com/confluentinc/examples/blob/
> >> > > master/kafka-streams/src/test/java/io/confluent/examples/
> >> > > streams/interactivequeries/WordCountInteractiveQueriesExa
> >> > > mpleTest.java#L200-L212
> >> > > > makes it appear that the params are boundaries and that it will
> >> return
> >> > an
> >> > > > inclusive list of every key/window combination. Truth?
> >> > > >
> >> > > > My tests to this end haven't returned anything.
> >> > > >
> >> > > > Im watching the values coming out of the KTable<Window, String>
> so I
> >> > can
> >> > > > send them back as request params. What Ive tried:
> >> > > >
> >> > > > - Window.key(), Window.key().start() and Window.key().end()
> >> > > > - Window.key(), (Window.key().start() - 1) and (Window.key().end()
> >> + 1)
> >> > > > - Window.key(), 0 and Window.key().end()
> >> > > > - Window.key(), 0 and (Window.key().end() + 1)
> >> > > >
> >> > > > None of these seem to hit anything in the StateStore.
> >> > > >
> >> > > > Is there a delay before Store values become available for
> >> '.fetch()'?
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Re: Understanding ReadOnlyWindowStore.fetch

Posted by Jon Yeargers <jo...@cedexis.com>.
To be a bit more specific:

If I call this:         KTable<Window, String> kt =
sourceStream.groupByKey().reduce(..., "somekeystore");

and then call this:

kt.forEach()-> ...

Can I assume that everything that comes out will be available in
"somekeystore"? If not, what subset should I expect to find there?

On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers <jo...@cedexis.com>
wrote:

> But if a key shows up in a KTable->forEach should it be available in the
> StateStore (from the KTable)?
>
> On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll <mi...@confluent.io>
> wrote:
>
>> Jon,
>>
>> there's a related example, using a window store and a key-value store, at
>> https://github.com/confluentinc/examples/blob/3.2.x/kafka-
>> streams/src/test/java/io/confluent/examples/streams/Val
>> idateStateWithInteractiveQueriesLambdaIntegrationTest.java
>> (this is for Confluent 3.2 / Kafka 0.10.2).
>>
>> -Michael
>>
>>
>>
>> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers <jo...@cedexis.com>
>> wrote:
>>
>> > Im only running one instance (locally) to keep things simple.
>> >
>> > Reduction:
>> >
>> >         KTable<Windowed<String>, String> hourAggStore =
>> > sourceStream.groupByKey().reduce(rowReducer,
>> >                 TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
>> > 1000).until(70 * 60 * 1000L),
>> >                 "HourAggStore");
>> >
>> > then I get values to look for via:
>> >
>> >         hourAggStore.foreach((k, v) -> {
>> >                 LogLine logLine = objectMapper.readValue(v,
>> logLine.class);
>> >                 LOGGER.debug("{}", k.key());
>> >         });
>> >
>> > Ive kept it easy by requesting everything from 0 to
>> > 'System.currentTimeMillis()'. Retrieval is done using a snip from your
>> > sample code "windowedByKey".
>> >
>> > Requests are sent in via curl and output through the same channel. I
>> pass
>> > in the key and ask for any values.
>> >
>> > Ive looked at the values passed in / out of the reduction function and
>> they
>> > look sane.
>> >
>> > My assumption is that if a value shows up in the 'forEach' loop this
>> > implies it exists in the StateStore. Accurate?
>> >
>> > In fact, only about one in 10 requests actually return any values. No
>> > errors - just no data.
>> >
>> >
>> >
>> > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy <da...@gmail.com>
>> wrote:
>> >
>> > > Hi Jon,
>> > >
>> > > If you are able to get a handle on the store, i.e., via
>> > > KafkaStreams.store(...) and call fetch without any exceptions, then
>> the
>> > > store is available.
>> > > The time params to fetch are the boundaries to search for windows for
>> the
>> > > given key. They relate to the start time of the window, so if you did
>> > > fetch(key, t1, t2) - it will find all the windows for key that start
>> in
>> > the
>> > > inclusive time range t1 - t2.
>> > >
>> > > Are you running more than one instance? If yes, then you want to make
>> > sure
>> > > that you are querying the correct instance. For that you can use:
>> > > KafkaStreams.metadataForKey(...) to find the instance that has the
>> key
>> > you
>> > > are looking for.
>> > >
>> > > Thanks,
>> > > Damian
>> > >
>> > >
>> > >
>> > > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers <jo...@cedexis.com>
>> > > wrote:
>> > >
>> > > > Im probing about trying to find a way to solve my aggregation -> db
>> > > issue.
>> > > > Looking at the '.fetch()'  function Im wondering about the
>> 'timeFrom'
>> > and
>> > > > 'timeTo' params as not a lot is mentioned about 'proper' usage.
>> > > >
>> > > > The test in
>> > > >
>> > > > https://github.com/confluentinc/examples/blob/
>> > > master/kafka-streams/src/test/java/io/confluent/examples/
>> > > streams/interactivequeries/WordCountInteractiveQueriesExa
>> > > mpleTest.java#L200-L212
>> > > > makes it appear that the params are boundaries and that it will
>> return
>> > an
>> > > > inclusive list of every key/window combination. Truth?
>> > > >
>> > > > My tests to this end haven't returned anything.
>> > > >
>> > > > Im watching the values coming out of the KTable<Window, String> so I
>> > can
>> > > > send them back as request params. What Ive tried:
>> > > >
>> > > > - Window.key(), Window.key().start() and Window.key().end()
>> > > > - Window.key(), (Window.key().start() - 1) and (Window.key().end()
>> + 1)
>> > > > - Window.key(), 0 and Window.key().end()
>> > > > - Window.key(), 0 and (Window.key().end() + 1)
>> > > >
>> > > > None of these seem to hit anything in the StateStore.
>> > > >
>> > > > Is there a delay before Store values become available for
>> '.fetch()'?
>> > > >
>> > >
>> >
>>
>
>

Re: Understanding ReadOnlyWindowStore.fetch

Posted by Jon Yeargers <jo...@cedexis.com>.
But if a key shows up in a KTable->forEach should it be available in the
StateStore (from the KTable)?

On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll <mi...@confluent.io> wrote:

> Jon,
>
> there's a related example, using a window store and a key-value store, at
> https://github.com/confluentinc/examples/blob/3.
> 2.x/kafka-streams/src/test/java/io/confluent/examples/streams/
> ValidateStateWithInteractiveQueriesLambdaIntegrationTest.java
> (this is for Confluent 3.2 / Kafka 0.10.2).
>
> -Michael
>
>
>
> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers <jo...@cedexis.com>
> wrote:
>
> > Im only running one instance (locally) to keep things simple.
> >
> > Reduction:
> >
> >         KTable<Windowed<String>, String> hourAggStore =
> > sourceStream.groupByKey().reduce(rowReducer,
> >                 TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
> > 1000).until(70 * 60 * 1000L),
> >                 "HourAggStore");
> >
> > then I get values to look for via:
> >
> >         hourAggStore.foreach((k, v) -> {
> >                 LogLine logLine = objectMapper.readValue(v,
> logLine.class);
> >                 LOGGER.debug("{}", k.key());
> >         });
> >
> > Ive kept it easy by requesting everything from 0 to
> > 'System.currentTimeMillis()'. Retrieval is done using a snip from your
> > sample code "windowedByKey".
> >
> > Requests are sent in via curl and output through the same channel. I pass
> > in the key and ask for any values.
> >
> > Ive looked at the values passed in / out of the reduction function and
> they
> > look sane.
> >
> > My assumption is that if a value shows up in the 'forEach' loop this
> > implies it exists in the StateStore. Accurate?
> >
> > In fact, only about one in 10 requests actually return any values. No
> > errors - just no data.
> >
> >
> >
> > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy <da...@gmail.com>
> wrote:
> >
> > > Hi Jon,
> > >
> > > If you are able to get a handle on the store, i.e., via
> > > KafkaStreams.store(...) and call fetch without any exceptions, then the
> > > store is available.
> > > The time params to fetch are the boundaries to search for windows for
> the
> > > given key. They relate to the start time of the window, so if you did
> > > fetch(key, t1, t2) - it will find all the windows for key that start in
> > the
> > > inclusive time range t1 - t2.
> > >
> > > Are you running more than one instance? If yes, then you want to make
> > sure
> > > that you are querying the correct instance. For that you can use:
> > > KafkaStreams.metadataForKey(...) to find the instance that has the key
> > you
> > > are looking for.
> > >
> > > Thanks,
> > > Damian
> > >
> > >
> > >
> > > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers <jo...@cedexis.com>
> > > wrote:
> > >
> > > > Im probing about trying to find a way to solve my aggregation -> db
> > > issue.
> > > > Looking at the '.fetch()'  function Im wondering about the 'timeFrom'
> > and
> > > > 'timeTo' params as not a lot is mentioned about 'proper' usage.
> > > >
> > > > The test in
> > > >
> > > > https://github.com/confluentinc/examples/blob/
> > > master/kafka-streams/src/test/java/io/confluent/examples/
> > > streams/interactivequeries/WordCountInteractiveQueriesExa
> > > mpleTest.java#L200-L212
> > > > makes it appear that the params are boundaries and that it will
> return
> > an
> > > > inclusive list of every key/window combination. Truth?
> > > >
> > > > My tests to this end haven't returned anything.
> > > >
> > > > Im watching the values coming out of the KTable<Window, String> so I
> > can
> > > > send them back as request params. What Ive tried:
> > > >
> > > > - Window.key(), Window.key().start() and Window.key().end()
> > > > - Window.key(), (Window.key().start() - 1) and (Window.key().end() +
> 1)
> > > > - Window.key(), 0 and Window.key().end()
> > > > - Window.key(), 0 and (Window.key().end() + 1)
> > > >
> > > > None of these seem to hit anything in the StateStore.
> > > >
> > > > Is there a delay before Store values become available for '.fetch()'?
> > > >
> > >
> >
>

Re: Understanding ReadOnlyWindowStore.fetch

Posted by Michael Noll <mi...@confluent.io>.
Jon,

there's a related example, using a window store and a key-value store, at
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/test/java/io/confluent/examples/streams/ValidateStateWithInteractiveQueriesLambdaIntegrationTest.java
(this is for Confluent 3.2 / Kafka 0.10.2).

-Michael



On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers <jo...@cedexis.com>
wrote:

> Im only running one instance (locally) to keep things simple.
>
> Reduction:
>
>         KTable<Windowed<String>, String> hourAggStore =
> sourceStream.groupByKey().reduce(rowReducer,
>                 TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
> 1000).until(70 * 60 * 1000L),
>                 "HourAggStore");
>
> then I get values to look for via:
>
>         hourAggStore.foreach((k, v) -> {
>                 LogLine logLine = objectMapper.readValue(v, logLine.class);
>                 LOGGER.debug("{}", k.key());
>         });
>
> Ive kept it easy by requesting everything from 0 to
> 'System.currentTimeMillis()'. Retrieval is done using a snip from your
> sample code "windowedByKey".
>
> Requests are sent in via curl and output through the same channel. I pass
> in the key and ask for any values.
>
> Ive looked at the values passed in / out of the reduction function and they
> look sane.
>
> My assumption is that if a value shows up in the 'forEach' loop this
> implies it exists in the StateStore. Accurate?
>
> In fact, only about one in 10 requests actually return any values. No
> errors - just no data.
>
>
>
> On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy <da...@gmail.com> wrote:
>
> > Hi Jon,
> >
> > If you are able to get a handle on the store, i.e., via
> > KafkaStreams.store(...) and call fetch without any exceptions, then the
> > store is available.
> > The time params to fetch are the boundaries to search for windows for the
> > given key. They relate to the start time of the window, so if you did
> > fetch(key, t1, t2) - it will find all the windows for key that start in
> the
> > inclusive time range t1 - t2.
> >
> > Are you running more than one instance? If yes, then you want to make
> sure
> > that you are querying the correct instance. For that you can use:
> > KafkaStreams.metadataForKey(...) to find the instance that has the key
> you
> > are looking for.
> >
> > Thanks,
> > Damian
> >
> >
> >
> > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers <jo...@cedexis.com>
> > wrote:
> >
> > > Im probing about trying to find a way to solve my aggregation -> db
> > issue.
> > > Looking at the '.fetch()'  function Im wondering about the 'timeFrom'
> and
> > > 'timeTo' params as not a lot is mentioned about 'proper' usage.
> > >
> > > The test in
> > >
> > > https://github.com/confluentinc/examples/blob/
> > master/kafka-streams/src/test/java/io/confluent/examples/
> > streams/interactivequeries/WordCountInteractiveQueriesExa
> > mpleTest.java#L200-L212
> > > makes it appear that the params are boundaries and that it will return
> an
> > > inclusive list of every key/window combination. Truth?
> > >
> > > My tests to this end haven't returned anything.
> > >
> > > Im watching the values coming out of the KTable<Window, String> so I
> can
> > > send them back as request params. What Ive tried:
> > >
> > > - Window.key(), Window.key().start() and Window.key().end()
> > > - Window.key(), (Window.key().start() - 1) and (Window.key().end() + 1)
> > > - Window.key(), 0 and Window.key().end()
> > > - Window.key(), 0 and (Window.key().end() + 1)
> > >
> > > None of these seem to hit anything in the StateStore.
> > >
> > > Is there a delay before Store values become available for '.fetch()'?
> > >
> >
>

Re: Understanding ReadOnlyWindowStore.fetch

Posted by Jon Yeargers <jo...@cedexis.com>.
Im only running one instance (locally) to keep things simple.

Reduction:

        KTable<Windowed<String>, String> hourAggStore =
sourceStream.groupByKey().reduce(rowReducer,
                TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
1000).until(70 * 60 * 1000L),
                "HourAggStore");

then I get values to look for via:

        hourAggStore.foreach((k, v) -> {
                LogLine logLine = objectMapper.readValue(v, logLine.class);
                LOGGER.debug("{}", k.key());
        });

Ive kept it easy by requesting everything from 0 to
'System.currentTimeMillis()'. Retrieval is done using a snip from your
sample code "windowedByKey".

Requests are sent in via curl and output through the same channel. I pass
in the key and ask for any values.

Ive looked at the values passed in / out of the reduction function and they
look sane.

My assumption is that if a value shows up in the 'forEach' loop this
implies it exists in the StateStore. Accurate?

In fact, only about one in 10 requests actually return any values. No
errors - just no data.



On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy <da...@gmail.com> wrote:

> Hi Jon,
>
> If you are able to get a handle on the store, i.e., via
> KafkaStreams.store(...) and call fetch without any exceptions, then the
> store is available.
> The time params to fetch are the boundaries to search for windows for the
> given key. They relate to the start time of the window, so if you did
> fetch(key, t1, t2) - it will find all the windows for key that start in the
> inclusive time range t1 - t2.
>
> Are you running more than one instance? If yes, then you want to make sure
> that you are querying the correct instance. For that you can use:
> KafkaStreams.metadataForKey(...) to find the instance that has the key you
> are looking for.
>
> Thanks,
> Damian
>
>
>
> On Tue, 28 Mar 2017 at 22:37 Jon Yeargers <jo...@cedexis.com>
> wrote:
>
> > Im probing about trying to find a way to solve my aggregation -> db
> issue.
> > Looking at the '.fetch()'  function Im wondering about the 'timeFrom' and
> > 'timeTo' params as not a lot is mentioned about 'proper' usage.
> >
> > The test in
> >
> > https://github.com/confluentinc/examples/blob/
> master/kafka-streams/src/test/java/io/confluent/examples/
> streams/interactivequeries/WordCountInteractiveQueriesExa
> mpleTest.java#L200-L212
> > makes it appear that the params are boundaries and that it will return an
> > inclusive list of every key/window combination. Truth?
> >
> > My tests to this end haven't returned anything.
> >
> > Im watching the values coming out of the KTable<Window, String> so I can
> > send them back as request params. What Ive tried:
> >
> > - Window.key(), Window.key().start() and Window.key().end()
> > - Window.key(), (Window.key().start() - 1) and (Window.key().end() + 1)
> > - Window.key(), 0 and Window.key().end()
> > - Window.key(), 0 and (Window.key().end() + 1)
> >
> > None of these seem to hit anything in the StateStore.
> >
> > Is there a delay before Store values become available for '.fetch()'?
> >
>

Re: Understanding ReadOnlyWindowStore.fetch

Posted by Damian Guy <da...@gmail.com>.
Hi Jon,

If you are able to get a handle on the store, i.e., via
KafkaStreams.store(...) and call fetch without any exceptions, then the
store is available.
The time params to fetch are the boundaries to search for windows for the
given key. They relate to the start time of the window, so if you did
fetch(key, t1, t2) - it will find all the windows for key that start in the
inclusive time range t1 - t2.

Are you running more than one instance? If yes, then you want to make sure
that you are querying the correct instance. For that you can use:
KafkaStreams.metadataForKey(...) to find the instance that has the key you
are looking for.

Thanks,
Damian



On Tue, 28 Mar 2017 at 22:37 Jon Yeargers <jo...@cedexis.com> wrote:

> Im probing about trying to find a way to solve my aggregation -> db issue.
> Looking at the '.fetch()'  function Im wondering about the 'timeFrom' and
> 'timeTo' params as not a lot is mentioned about 'proper' usage.
>
> The test in
>
> https://github.com/confluentinc/examples/blob/master/kafka-streams/src/test/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExampleTest.java#L200-L212
> makes it appear that the params are boundaries and that it will return an
> inclusive list of every key/window combination. Truth?
>
> My tests to this end haven't returned anything.
>
> Im watching the values coming out of the KTable<Window, String> so I can
> send them back as request params. What Ive tried:
>
> - Window.key(), Window.key().start() and Window.key().end()
> - Window.key(), (Window.key().start() - 1) and (Window.key().end() + 1)
> - Window.key(), 0 and Window.key().end()
> - Window.key(), 0 and (Window.key().end() + 1)
>
> None of these seem to hit anything in the StateStore.
>
> Is there a delay before Store values become available for '.fetch()'?
>