You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Claudia Wegmann <c....@kasasi.de> on 2018/02/05 12:51:45 UTC

Custom window implementation

Hey there,

I`m quite new to Kafka itself and the Streams API, so pardon any shortcomings.

What I need is a Sliding Window giving me access to the data of e. g. the last half hour in relation to the current data element. As far as I understand, the Streams API does not  provide such a window.

Therefore I tried to implement my own transformer. It is backed by a custom Window Store which knows about the window size and therefore just needs one timestamp for fetching data. This custom Window Store is backed by a Persistent Window Store:


super(Stores.persistentWindowStore(name, windowSizeMillis, 2, windowSizeMillis, false), Serdes.String(), new ImportDataSerde(), Time.SYSTEM);

The implementation of the transformer looks something like this:

@Override
public KeyValue<String, ImportData> transform(final String key, final ImportData data) {
    final long windowEndTime = data.getMetaData().getTimestamp().toInstant().toEpochMilli();

    // add current value to the queue
    getStateStore().put(key, data, windowEndTime);

    // fetch data from state for given key
    final Collection<ImportData> importDataList = getStateStore().fetch(key, windowEndTime);

    // forward new Import data with all fetched entries
    return KeyValue.pair(key, new ImportData(importDataList));
}

On an incoming data element, I first extract its timestamp. Then I add the data to the store. After that I fetch the data of the window.

This works as expected in tests. Now I installed the service on my server and tried to process some real data. Problem now is, that sometimes the fetch does not return any data.

So here the questions:
1.) How is it even possible for the fetch to not return any elements? I just put in the current value. The least I would expect is that it would return just this value.

2.) Currently, I’m guessing that the window already deleted all the data because I’m using the wrong time semantics. How does the Time Stamp Extractor influence windows?

3.) To build the backing Window Store I call Stores.persistenWindowStore(). The parameters of this method are quite unclear to me:
a) What are the segments of the store? How do I choose a good number of segments?
b) Does the retention time start at the beginning of the window or at the end of it?
c) Why does the store even need to know about the window size?
d) How does the store identify duplicates?
(sorry for those probably quite stupid questions, I guess I didn’t get all the logic of windows yet.)

4.) Is there an easier way to implement this logic? Did I miss any predefined windows giving me, what I need?

Thanks for reading this. I’m glad about any answers or pointer to docs explaining this.

By the way: I quite like Kafka and Kafka Streams so far 😊

Best,
Claudia