You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sachin Mittal <sj...@gmail.com> on 2017/07/15 06:07:47 UTC

Do we have to query localWindowStore in same java instance we are creating the store

Hi,
I have created a simple window store to count occurrences of a given key.


My pipeline is:

        TimeWindows windows = TimeWindows.of(n).advanceBy(n).until(30n);
        final StateStoreSupplier<WindowStore> supplier =
Stores.create("key-table")
                .withKeys(Serdes.String())
                .withValues(Serdes.Long())
                .persistent()
                .enableLogging(topicConfigMap)
                .windowed(windows.size(), windows.maintainMs(),
windows.segments, false)
                .build();

        builder.stream(Serdes.String(), valueSerde, "input-topic")
        .groupByKey()
        .count(windows, supplier)


Now as per docs to query the store I would have to use:

String storeName = supplier.name();
ReadOnlyWindowStore<String,Long> localWindowStore =
streams.store(storeName, QueryableStoreTypes.<String,
Long>windowStore());
String key = "some-key";
long fromTime = ...;
long toTime = ...;
WindowStoreIterator<Long> countForWordsForWindows =
localWindowStore.fetch(key, timeFrom, timeTo);


My questions are:

1. Can I run a different java application to query the state store
created by first application.

If yes then how can I refer to the state store?


2. Value in the state store against any given key will keep
incrementing as and when we read new data from the topic for a given
time period.

So at time t say count against k1 is 5 for a given window

If we query that time we get 5, but at time t1 for same key and window
count increases to 10.

If we query that time we get 10.

Question is how do we make sure that we query the state store only
after it has aggregated all the values for a given window?

And is there a way for that java application to run forever (just like
streams application)to keep querying state store and report back the
values.


Thanks

Sachin

Re: Do we have to query localWindowStore in same java instance we are creating the store

Posted by Eno Thereska <en...@gmail.com>.
Hi Sachin,

1. You can run a remote query and we provide some example code (https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/ <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>) however by default APache Kafka ships with just the local query capabilities. The above example has some code to do remote querying though.

2. So we don’t have a notion of windows closing in Kafka Streams. The application will need to decide how frequently to query. So your Java application can query, sleep, and query in a while(1) loop for example.

Cheers
Eno


> On Jul 15, 2017, at 7:07 AM, Sachin Mittal <sj...@gmail.com> wrote:
> 
> Hi,
> I have created a simple window store to count occurrences of a given key.
> 
> 
> My pipeline is:
> 
>        TimeWindows windows = TimeWindows.of(n).advanceBy(n).until(30n);
>        final StateStoreSupplier<WindowStore> supplier =
> Stores.create("key-table")
>                .withKeys(Serdes.String())
>                .withValues(Serdes.Long())
>                .persistent()
>                .enableLogging(topicConfigMap)
>                .windowed(windows.size(), windows.maintainMs(),
> windows.segments, false)
>                .build();
> 
>        builder.stream(Serdes.String(), valueSerde, "input-topic")
>        .groupByKey()
>        .count(windows, supplier)
> 
> 
> Now as per docs to query the store I would have to use:
> 
> String storeName = supplier.name();
> ReadOnlyWindowStore<String,Long> localWindowStore =
> streams.store(storeName, QueryableStoreTypes.<String,
> Long>windowStore());
> String key = "some-key";
> long fromTime = ...;
> long toTime = ...;
> WindowStoreIterator<Long> countForWordsForWindows =
> localWindowStore.fetch(key, timeFrom, timeTo);
> 
> 
> My questions are:
> 
> 1. Can I run a different java application to query the state store
> created by first application.
> 
> If yes then how can I refer to the state store?
> 
> 
> 2. Value in the state store against any given key will keep
> incrementing as and when we read new data from the topic for a given
> time period.
> 
> So at time t say count against k1 is 5 for a given window
> 
> If we query that time we get 5, but at time t1 for same key and window
> count increases to 10.
> 
> If we query that time we get 10.
> 
> Question is how do we make sure that we query the state store only
> after it has aggregated all the values for a given window?
> 
> And is there a way for that java application to run forever (just like
> streams application)to keep querying state store and report back the
> values.
> 
> 
> Thanks
> 
> Sachin