You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Hamza HACHANI <ha...@supcom.tn> on 2016/11/02 20:12:37 UTC

Re : windowing with the processor api

Thanks a lot.
This was very helpful .

Hamza



----- Message de réponse -----
De : "Eno Thereska" <en...@gmail.com>
Pour : "users@kafka.apache.org" <us...@kafka.apache.org>
Objet : windowing with the processor api
Date : mer., nov. 2, 2016 19:18

Thanks Matthias, yes, to get window operations, or things like hopping or sliding windows you need to use the DSL (e.g., TimeWindows class). The Processor API is very basic (and thus flexible but) you'll end up re-implementing TimeWindows.

Eno

> On 2 Nov 2016, at 17:45, Matthias J. Sax <ma...@confluent.io> wrote:
>
> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> A windowed store does not work the way you expect it. The parameter
> "windowSize" is not a store parameter itself, but a caching parameter
> for the store (only used if caching get's enabled).
>
> For window support, Streams provide window semantics on top of the
> store and the store is not aware of window in this sense. Each window
> gets an ID that is encoded in the store key as "record-key:window-ID".
> And records timestamps are mapped to window-IDs to find the correct
> window a records gets put into... To the store is still a plain
> key-value store and is not aware of any windowing stuff.
>
> I would highly recommend to use DSL to use windows operations. This
> should not be a limitation as you can mix-and-match DSL and Processor
> API. All you can do with plain processor API you can also do within
> DSL via .process(...)
>
>
> - -Matthias
>
> On 11/2/16 3:49 AM, Hamza HACHANI wrote:
>> Hi Eno,
>>
>>
>> What I want to say is that i don't find a place where to define the
>> size of the window and where to precise the time of the advance.
>>
>>
>> Hamza
>>
>> Thanks
>>
>> ________________________________ De : Eno Thereska
>> <en...@gmail.com> Envoyé : mardi 1 novembre 2016 22:44:47 À
>> : users@kafka.apache.org Objet : Re: windowing with the processor
>> api
>>
>> Hi Hamza,
>>
>> Are you getting a particular error? Here is an example :
>>
>> Stores.create("window-store") .withStringKeys()
>> .withStringValues() .persistent() .windowed(10, 10, 2,
>> false).build(), "the-processor")
>>
>> Thanks Eno
>>
>>> On 2 Nov 2016, at 08:19, Hamza HACHANI <ha...@supcom.tn>
>>> wrote:
>>>
>>> Hi,
>>>
>>> I would like to know if somebody has an idea how to define the
>>> size of the window in the processor api.
>>>
>>> I've been blocked for 6 days looking for a solution.
>>>
>>> using :
>>>
>>> Stores.create(...).withStringKeys().withStringValues().persistent().w
> indowed(...).build()
>>>
>>>
>>>
> I was able to define the retention time but not the the size of the wind
> ow.
>>>
>>> Please help me if possible.
>>>
>>> Thanks,
>>>
>>> Hamza
>>
>>
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYGiZDAAoJECnhiMLycopPIJsQAJUmjdbgYctWfKQEmPGcgXip
> 0S7Aicm6g1A9imvb0i33J+wGBf8nx4oHHZz53UmXM/DxTf0JlOSqcxGMAM5YTw04
> IVs5r90E+MiuvaqVoWC0FHMOzZwiXx88Rr192jjrgg3KaoAvjo1WeuF0voD7iQK5
> 7eBcVh0jZELxKFMR51Ax9BQ24DQSbrnjX45Opcn8BZEbwDEp+FIZxQfZLRvu2HRK
> JFgu0ur8pNNvSw8QDJ2ivBXNZ9sEu1altlmpHlrpYw8N8KmJ2bQXQWZYNZZjDrrm
> OjUznGjFsaLGJJymKZiAji49x3anM+h35dzFbJyy0AhXG/mtU3wI5zyxQk8dyue2
> 3iHnehHfdIVj7/STbBSj9ZhcWpvFVotfpLz1Nst5lnN6geGuhBC3ZlHf14yInu/e
> 64rzgJHWrnaSOlicPdQ4b2Y+EH3rFfH2iVJfSG0fOL+QzcqFLcuZfnLJV4PJhqiR
> qB8mY4p0yTdNZIRgTDc3lyTuVv3+lxj810XCp7evhA3erkGDV/hc0lV8Fmqb5eFC
> O3Z3k1N7rlOT7R1ATioONr5JMFgATh0nkpglY91dG38F297PkUZzFpdVe/79gh84
> Z6CE1M32vompN65QQ5P4jB8V24Z0RmaLFhnAknZjZUHCwuLyFaYvJs3RAJVWRIBz
> AxqqweAPlcocjRt3DHu3
> =ZLY9
> -----END PGP SIGNATURE-----