You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jon Yeargers <jo...@cedexis.com> on 2016/12/06 23:33:01 UTC

accessing state-store ala WordCount example

I copied out some of the WordCountInteractive
<https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExample.java>
demo
code to see how the REST access works. I have an aggregator

groupByKey().aggregate(LogLine::new,
    new aggregate(),
    TimeWindows.of(60 * 60 * 1000L),
    collectorSerde, "agg_stream");


I incorporated the Jetty-server bits from the sample. When I run it I can
see results via the '/states/instances' entry point but nothing from the
'/states/keyvalues/agg_stream/all'.

The aggregator is churning away so I'd assume the state store would have
plenty of key/value pairs but it comes up empty.

What's the proper way to use this?

Re: accessing state-store ala WordCount example

Posted by Eno Thereska <en...@gmail.com>.
Hi Jon,

The "/windowed" namel in the web server example is just an example name, it could have been called differently too. It is built however on the Interactive Query APIs which are fixed. In the example code I mentioned we see the implementation as shown below. Again, the web server code is just a reference implementation that you can feel free to change or call differently (as long as you call store.fetch etc correctly). 



@GET()
  @Path("/windowed/{storeName}/{key}/{from}/{to}")
  @Produces(MediaType.APPLICATION_JSON)
  public List<KeyValueBean> windowedByKey(@PathParam("storeName") final String storeName,
                                          @PathParam("key") final String key,
                                          @PathParam("from") final Long from,
                                          @PathParam("to") final Long to) {

    // Lookup the WindowStore with the provided storeName
    final ReadOnlyWindowStore<String, Long> store = streams.store(storeName,
                                                                  QueryableStoreTypes.<String, Long>windowStore());
    if (store == null) {
      throw new NotFoundException();
    }

    // fetch the window results for the given key and time range
    final WindowStoreIterator<Long> results = store.fetch(key, from, to);

    final List<KeyValueBean> windowResults = new ArrayList<>();
    while (results.hasNext()) {
      final KeyValue<Long, Long> next = results.next();
      // convert the result to have the window time and the key (for display purposes)
      windowResults.add(new KeyValueBean(key + "@" + next.key, next.value));
    }
    return windowResults;
  }


> On 7 Dec 2016, at 11:13, Jon Yeargers <jo...@cedexis.com> wrote:
> 
> Im having trouble finding documentation on this new feature. Can you point
> me to anything?
> 
> Specifically on how to get available "from/to" values but more generally on
> how to use the "windowed" query.
> 
> On Wed, Dec 7, 2016 at 1:25 AM, Eno Thereska <en...@gmail.com> wrote:
> 
>> Hi Jon,
>> 
>> This will be a windowed store. Have a look at the Jetty-server bits for
>> windowedByKey:
>> "/windowed/{storeName}/{key}/{from}/{to}"
>> 
>> Thanks
>> Eno
>> 
>>> On 6 Dec 2016, at 23:33, Jon Yeargers <jo...@cedexis.com> wrote:
>>> 
>>> I copied out some of the WordCountInteractive
>>> <https://github.com/confluentinc/examples/blob/3.
>> 1.x/kafka-streams/src/main/java/io/confluent/examples/
>> streams/interactivequeries/WordCountInteractiveQueriesExample.java>
>>> demo
>>> code to see how the REST access works. I have an aggregator
>>> 
>>> groupByKey().aggregate(LogLine::new,
>>>   new aggregate(),
>>>   TimeWindows.of(60 * 60 * 1000L),
>>>   collectorSerde, "agg_stream");
>>> 
>>> 
>>> I incorporated the Jetty-server bits from the sample. When I run it I can
>>> see results via the '/states/instances' entry point but nothing from the
>>> '/states/keyvalues/agg_stream/all'.
>>> 
>>> The aggregator is churning away so I'd assume the state store would have
>>> plenty of key/value pairs but it comes up empty.
>>> 
>>> What's the proper way to use this?
>> 
>> 


Re: accessing state-store ala WordCount example

Posted by Jon Yeargers <jo...@cedexis.com>.
Im having trouble finding documentation on this new feature. Can you point
me to anything?

Specifically on how to get available "from/to" values but more generally on
how to use the "windowed" query.

On Wed, Dec 7, 2016 at 1:25 AM, Eno Thereska <en...@gmail.com> wrote:

> Hi Jon,
>
> This will be a windowed store. Have a look at the Jetty-server bits for
> windowedByKey:
> "/windowed/{storeName}/{key}/{from}/{to}"
>
> Thanks
> Eno
>
> > On 6 Dec 2016, at 23:33, Jon Yeargers <jo...@cedexis.com> wrote:
> >
> > I copied out some of the WordCountInteractive
> > <https://github.com/confluentinc/examples/blob/3.
> 1.x/kafka-streams/src/main/java/io/confluent/examples/
> streams/interactivequeries/WordCountInteractiveQueriesExample.java>
> > demo
> > code to see how the REST access works. I have an aggregator
> >
> > groupByKey().aggregate(LogLine::new,
> >    new aggregate(),
> >    TimeWindows.of(60 * 60 * 1000L),
> >    collectorSerde, "agg_stream");
> >
> >
> > I incorporated the Jetty-server bits from the sample. When I run it I can
> > see results via the '/states/instances' entry point but nothing from the
> > '/states/keyvalues/agg_stream/all'.
> >
> > The aggregator is churning away so I'd assume the state store would have
> > plenty of key/value pairs but it comes up empty.
> >
> > What's the proper way to use this?
>
>

Re: accessing state-store ala WordCount example

Posted by Eno Thereska <en...@gmail.com>.
Hi Jon,

This will be a windowed store. Have a look at the Jetty-server bits for windowedByKey:
"/windowed/{storeName}/{key}/{from}/{to}"

Thanks
Eno

> On 6 Dec 2016, at 23:33, Jon Yeargers <jo...@cedexis.com> wrote:
> 
> I copied out some of the WordCountInteractive
> <https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExample.java>
> demo
> code to see how the REST access works. I have an aggregator
> 
> groupByKey().aggregate(LogLine::new,
>    new aggregate(),
>    TimeWindows.of(60 * 60 * 1000L),
>    collectorSerde, "agg_stream");
> 
> 
> I incorporated the Jetty-server bits from the sample. When I run it I can
> see results via the '/states/instances' entry point but nothing from the
> '/states/keyvalues/agg_stream/all'.
> 
> The aggregator is churning away so I'd assume the state store would have
> plenty of key/value pairs but it comes up empty.
> 
> What's the proper way to use this?