You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Hamza HACHANI <ha...@supcom.tn> on 2016/10/21 14:42:58 UTC

customised event time

Hi,


I would like to process data based on a customised event time.(a timestamp that I implement as part of the message).

The data is processed in periodic windows of x time that are parametered via the method punctuate.

What I need is a retention time for the window to be able to treat the late arriving messages.

Can I do this : define/configure a retention time for windows . For example the window which treat data between 15pm and 16pm forward the result not in 16pm but in 16:15 pm.

Thanks in advance for your help.


Hamza


Re: customised event time

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks! :)

On Tue, Oct 25, 2016 at 12:20 AM, Hamza HACHANI <ha...@supcom.tn>
wrote:

> Merci Guoahang.
>
> Bonne journée.
>
> ________________________________
> De : Guozhang Wang <wa...@gmail.com>
> Envoyé : lundi 24 octobre 2016 16:50:45
> À : users@kafka.apache.org
> Objet : Re: customised event time
>
> Hi Hamza,
>
> You can create a windowed store in the processor API via the Stores factory
> class: org.apache.kafka.streams.state.Stores
>
> More specifically, you you do sth. like:
>
> Stores.create().withKeys().withValues().persistent().windowed(/* you can
> specify window size, retention period etc here */)
>
>
> Which returns the RocksDBWindowStoreSupplier.
>
> Guozhang
>
>
> On Mon, Oct 24, 2016 at 2:23 AM, Hamza HACHANI <ha...@supcom.tn>
> wrote:
>
> > And the start time and end time of the window.
> >
> > In other words i need the notion of windows in the proecessor API.
> >
> > Is this possible ?
> >
> > ________________________________
> > De : Hamza HACHANI <ha...@supcom.tn>
> > Envoyé : dimanche 23 octobre 2016 20:43:05
> > À : users@kafka.apache.org
> > Objet : RE: customised event time
> >
> > To be more specific.
> >
> > What id do really need is the property of the retention time dor the
> > window in the processor API.
> >
> > Because for the window  i think that i can manage to do this.
> >
> >
> > Hamza
> >
> > ________________________________
> > De : Hamza HACHANI <ha...@supcom.tn>
> > Envoyé : dimanche 23 octobre 2016 20:30:13
> > À : users@kafka.apache.org
> > Objet : RE: customised event time
> >
> > Hi,
> >
> > I think that maybe i'm asking much.
> >
> > But Ineed the aspect of windowing in the processor API not in the Stram
> > DSL. Is this possible?
> >
> > The second question is how can i get rid of the intermediate results
> > because i'm only interested in the final result given by the window.
> >
> > Hamza
> >
> > ________________________________
> > De : Matthias J. Sax <ma...@confluent.io>
> > Envoyé : samedi 22 octobre 2016 16:12:45
> > À : users@kafka.apache.org
> > Objet : Re: customised event time
> >
> > -----BEGIN PGP SIGNED MESSAGE-----
> > Hash: SHA512
> >
> > Hi,
> >
> > you can set window retention time via method Windows#until() (and this
> > retention time is based on the timestamps returned from you custom
> > timestamp extractor). This keeps all windows until the retention time
> > passes and thus, all later arrival records will be processed correctly.
> >
> > However, Kafka Streams does not close windows as other framework, but
> > rather gives you an (intermediate) result each time a window is
> > updated with a new record (regardless if the record is in-order or
> > late -- you will get a result record in both cases).
> >
> > As of Kafka 0.10.1 those (intermediate) results get deduplicated so
> > you might not receive all (intermediate) results downstream. Of
> > course, it is ensured, that you will eventually get the latest/final
> > result sent downstream.
> >
> >
> > - -Matthias
> >
> > On 10/21/16 7:42 AM, Hamza HACHANI wrote:
> > > Hi,
> > >
> > >
> > > I would like to process data based on a customised event time.(a
> > > timestamp that I implement as part of the message).
> > >
> > > The data is processed in periodic windows of x time that are
> > > parametered via the method punctuate.
> > >
> > > What I need is a retention time for the window to be able to treat
> > > the late arriving messages.
> > >
> > > Can I do this : define/configure a retention time for windows . For
> > > example the window which treat data between 15pm and 16pm forward
> > > the result not in 16pm but in 16:15 pm.
> > >
> > > Thanks in advance for your help.
> > >
> > >
> > > Hamza
> > >
> > >
> > -----BEGIN PGP SIGNATURE-----
> > Comment: GPGTools - https://gpgtools.org
> >
> > iQIcBAEBCgAGBQJYDDi9AAoJECnhiMLycopPhR8P+wVnRiRwuM7omj3l4EfBVP9Q
> > 7Vflwzf9TFmCjmtpSkezQVNSOhU7maUEB7R/4rjqacLuALJBPrOZ+mrRfYIx/oJd
> > E7oc3JXO1dlANMyKQ8SQ9dkXWxkeyNtl9ujkcFhMBejqtamteVaI/iNdUdbs6CCk
> > AZG86yQKhhPL2Q+BZK0eu88bc0yB5Kwj86uCFFkzMaMXVkvCY1f/kkplRGADsSfd
> > dfv5LONFzXmJ5dftC1giMis4Pg/kJU9gwvmh+WJB2tSM8KjXf1hZWe4nv1f+af8g
> > pVLnyepv7i7I6PwX/wSdtY2uSLKjizLSbvR3JsfAqLWSVBMgnPpHjCvGecR8K3IC
> > FwsnWUk9QJKn4EoahsfRq+afUYva8HRxNH7Aiq+gXvEx9LPgwCqbOGYy4ZnOaoi5
> > PycddoNaK1QEn3WYSxN88kLWFx5+vem4LWiBSDBT+Px0p14ZgKwEjqNgyj4fxBtw
> > SRsrxVvXXcj2qWvjtRCmTYpn7hanAbYT0QoNfIPs20/Jrh3qsaVJIt5xzRuYyxC8
> > aobYLlRFLL82eYaPcdQNz/1T0Q7kT9NbEabiuADaZIz7X1yQluMWuTaGIDk/6j6v
> > A2OqaJd8+KkMMOy9PUX1rBnJhJPZ4CTvkzmjnY10HU7sneNpBvU0tfnPdBBCU15B
> > 8bsUiTf0lk6t9amGYT6q
> > =PcW7
> > -----END PGP SIGNATURE-----
> >
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

RE: customised event time

Posted by Hamza HACHANI <ha...@supcom.tn>.
Merci Guoahang.

Bonne journée.

________________________________
De : Guozhang Wang <wa...@gmail.com>
Envoyé : lundi 24 octobre 2016 16:50:45
À : users@kafka.apache.org
Objet : Re: customised event time

Hi Hamza,

You can create a windowed store in the processor API via the Stores factory
class: org.apache.kafka.streams.state.Stores

More specifically, you you do sth. like:

Stores.create().withKeys().withValues().persistent().windowed(/* you can
specify window size, retention period etc here */)


Which returns the RocksDBWindowStoreSupplier.

Guozhang


On Mon, Oct 24, 2016 at 2:23 AM, Hamza HACHANI <ha...@supcom.tn>
wrote:

> And the start time and end time of the window.
>
> In other words i need the notion of windows in the proecessor API.
>
> Is this possible ?
>
> ________________________________
> De : Hamza HACHANI <ha...@supcom.tn>
> Envoyé : dimanche 23 octobre 2016 20:43:05
> À : users@kafka.apache.org
> Objet : RE: customised event time
>
> To be more specific.
>
> What id do really need is the property of the retention time dor the
> window in the processor API.
>
> Because for the window  i think that i can manage to do this.
>
>
> Hamza
>
> ________________________________
> De : Hamza HACHANI <ha...@supcom.tn>
> Envoyé : dimanche 23 octobre 2016 20:30:13
> À : users@kafka.apache.org
> Objet : RE: customised event time
>
> Hi,
>
> I think that maybe i'm asking much.
>
> But Ineed the aspect of windowing in the processor API not in the Stram
> DSL. Is this possible?
>
> The second question is how can i get rid of the intermediate results
> because i'm only interested in the final result given by the window.
>
> Hamza
>
> ________________________________
> De : Matthias J. Sax <ma...@confluent.io>
> Envoyé : samedi 22 octobre 2016 16:12:45
> À : users@kafka.apache.org
> Objet : Re: customised event time
>
> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Hi,
>
> you can set window retention time via method Windows#until() (and this
> retention time is based on the timestamps returned from you custom
> timestamp extractor). This keeps all windows until the retention time
> passes and thus, all later arrival records will be processed correctly.
>
> However, Kafka Streams does not close windows as other framework, but
> rather gives you an (intermediate) result each time a window is
> updated with a new record (regardless if the record is in-order or
> late -- you will get a result record in both cases).
>
> As of Kafka 0.10.1 those (intermediate) results get deduplicated so
> you might not receive all (intermediate) results downstream. Of
> course, it is ensured, that you will eventually get the latest/final
> result sent downstream.
>
>
> - -Matthias
>
> On 10/21/16 7:42 AM, Hamza HACHANI wrote:
> > Hi,
> >
> >
> > I would like to process data based on a customised event time.(a
> > timestamp that I implement as part of the message).
> >
> > The data is processed in periodic windows of x time that are
> > parametered via the method punctuate.
> >
> > What I need is a retention time for the window to be able to treat
> > the late arriving messages.
> >
> > Can I do this : define/configure a retention time for windows . For
> > example the window which treat data between 15pm and 16pm forward
> > the result not in 16pm but in 16:15 pm.
> >
> > Thanks in advance for your help.
> >
> >
> > Hamza
> >
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYDDi9AAoJECnhiMLycopPhR8P+wVnRiRwuM7omj3l4EfBVP9Q
> 7Vflwzf9TFmCjmtpSkezQVNSOhU7maUEB7R/4rjqacLuALJBPrOZ+mrRfYIx/oJd
> E7oc3JXO1dlANMyKQ8SQ9dkXWxkeyNtl9ujkcFhMBejqtamteVaI/iNdUdbs6CCk
> AZG86yQKhhPL2Q+BZK0eu88bc0yB5Kwj86uCFFkzMaMXVkvCY1f/kkplRGADsSfd
> dfv5LONFzXmJ5dftC1giMis4Pg/kJU9gwvmh+WJB2tSM8KjXf1hZWe4nv1f+af8g
> pVLnyepv7i7I6PwX/wSdtY2uSLKjizLSbvR3JsfAqLWSVBMgnPpHjCvGecR8K3IC
> FwsnWUk9QJKn4EoahsfRq+afUYva8HRxNH7Aiq+gXvEx9LPgwCqbOGYy4ZnOaoi5
> PycddoNaK1QEn3WYSxN88kLWFx5+vem4LWiBSDBT+Px0p14ZgKwEjqNgyj4fxBtw
> SRsrxVvXXcj2qWvjtRCmTYpn7hanAbYT0QoNfIPs20/Jrh3qsaVJIt5xzRuYyxC8
> aobYLlRFLL82eYaPcdQNz/1T0Q7kT9NbEabiuADaZIz7X1yQluMWuTaGIDk/6j6v
> A2OqaJd8+KkMMOy9PUX1rBnJhJPZ4CTvkzmjnY10HU7sneNpBvU0tfnPdBBCU15B
> 8bsUiTf0lk6t9amGYT6q
> =PcW7
> -----END PGP SIGNATURE-----
>



--
-- Guozhang

Re: customised event time

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Hamza,

You can create a windowed store in the processor API via the Stores factory
class: org.apache.kafka.streams.state.Stores

More specifically, you you do sth. like:

Stores.create().withKeys().withValues().persistent().windowed(/* you can
specify window size, retention period etc here */)


Which returns the RocksDBWindowStoreSupplier.

Guozhang


On Mon, Oct 24, 2016 at 2:23 AM, Hamza HACHANI <ha...@supcom.tn>
wrote:

> And the start time and end time of the window.
>
> In other words i need the notion of windows in the proecessor API.
>
> Is this possible ?
>
> ________________________________
> De : Hamza HACHANI <ha...@supcom.tn>
> Envoyé : dimanche 23 octobre 2016 20:43:05
> À : users@kafka.apache.org
> Objet : RE: customised event time
>
> To be more specific.
>
> What id do really need is the property of the retention time dor the
> window in the processor API.
>
> Because for the window  i think that i can manage to do this.
>
>
> Hamza
>
> ________________________________
> De : Hamza HACHANI <ha...@supcom.tn>
> Envoyé : dimanche 23 octobre 2016 20:30:13
> À : users@kafka.apache.org
> Objet : RE: customised event time
>
> Hi,
>
> I think that maybe i'm asking much.
>
> But Ineed the aspect of windowing in the processor API not in the Stram
> DSL. Is this possible?
>
> The second question is how can i get rid of the intermediate results
> because i'm only interested in the final result given by the window.
>
> Hamza
>
> ________________________________
> De : Matthias J. Sax <ma...@confluent.io>
> Envoyé : samedi 22 octobre 2016 16:12:45
> À : users@kafka.apache.org
> Objet : Re: customised event time
>
> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Hi,
>
> you can set window retention time via method Windows#until() (and this
> retention time is based on the timestamps returned from you custom
> timestamp extractor). This keeps all windows until the retention time
> passes and thus, all later arrival records will be processed correctly.
>
> However, Kafka Streams does not close windows as other framework, but
> rather gives you an (intermediate) result each time a window is
> updated with a new record (regardless if the record is in-order or
> late -- you will get a result record in both cases).
>
> As of Kafka 0.10.1 those (intermediate) results get deduplicated so
> you might not receive all (intermediate) results downstream. Of
> course, it is ensured, that you will eventually get the latest/final
> result sent downstream.
>
>
> - -Matthias
>
> On 10/21/16 7:42 AM, Hamza HACHANI wrote:
> > Hi,
> >
> >
> > I would like to process data based on a customised event time.(a
> > timestamp that I implement as part of the message).
> >
> > The data is processed in periodic windows of x time that are
> > parametered via the method punctuate.
> >
> > What I need is a retention time for the window to be able to treat
> > the late arriving messages.
> >
> > Can I do this : define/configure a retention time for windows . For
> > example the window which treat data between 15pm and 16pm forward
> > the result not in 16pm but in 16:15 pm.
> >
> > Thanks in advance for your help.
> >
> >
> > Hamza
> >
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYDDi9AAoJECnhiMLycopPhR8P+wVnRiRwuM7omj3l4EfBVP9Q
> 7Vflwzf9TFmCjmtpSkezQVNSOhU7maUEB7R/4rjqacLuALJBPrOZ+mrRfYIx/oJd
> E7oc3JXO1dlANMyKQ8SQ9dkXWxkeyNtl9ujkcFhMBejqtamteVaI/iNdUdbs6CCk
> AZG86yQKhhPL2Q+BZK0eu88bc0yB5Kwj86uCFFkzMaMXVkvCY1f/kkplRGADsSfd
> dfv5LONFzXmJ5dftC1giMis4Pg/kJU9gwvmh+WJB2tSM8KjXf1hZWe4nv1f+af8g
> pVLnyepv7i7I6PwX/wSdtY2uSLKjizLSbvR3JsfAqLWSVBMgnPpHjCvGecR8K3IC
> FwsnWUk9QJKn4EoahsfRq+afUYva8HRxNH7Aiq+gXvEx9LPgwCqbOGYy4ZnOaoi5
> PycddoNaK1QEn3WYSxN88kLWFx5+vem4LWiBSDBT+Px0p14ZgKwEjqNgyj4fxBtw
> SRsrxVvXXcj2qWvjtRCmTYpn7hanAbYT0QoNfIPs20/Jrh3qsaVJIt5xzRuYyxC8
> aobYLlRFLL82eYaPcdQNz/1T0Q7kT9NbEabiuADaZIz7X1yQluMWuTaGIDk/6j6v
> A2OqaJd8+KkMMOy9PUX1rBnJhJPZ4CTvkzmjnY10HU7sneNpBvU0tfnPdBBCU15B
> 8bsUiTf0lk6t9amGYT6q
> =PcW7
> -----END PGP SIGNATURE-----
>



-- 
-- Guozhang

RE: customised event time

Posted by Hamza HACHANI <ha...@supcom.tn>.
And the start time and end time of the window.

In other words i need the notion of windows in the proecessor API.

Is this possible ?

________________________________
De : Hamza HACHANI <ha...@supcom.tn>
Envoyé : dimanche 23 octobre 2016 20:43:05
À : users@kafka.apache.org
Objet : RE: customised event time

To be more specific.

What id do really need is the property of the retention time dor the window in the processor API.

Because for the window  i think that i can manage to do this.


Hamza

________________________________
De : Hamza HACHANI <ha...@supcom.tn>
Envoyé : dimanche 23 octobre 2016 20:30:13
À : users@kafka.apache.org
Objet : RE: customised event time

Hi,

I think that maybe i'm asking much.

But Ineed the aspect of windowing in the processor API not in the Stram DSL. Is this possible?

The second question is how can i get rid of the intermediate results because i'm only interested in the final result given by the window.

Hamza

________________________________
De : Matthias J. Sax <ma...@confluent.io>
Envoyé : samedi 22 octobre 2016 16:12:45
À : users@kafka.apache.org
Objet : Re: customised event time

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Hi,

you can set window retention time via method Windows#until() (and this
retention time is based on the timestamps returned from you custom
timestamp extractor). This keeps all windows until the retention time
passes and thus, all later arrival records will be processed correctly.

However, Kafka Streams does not close windows as other framework, but
rather gives you an (intermediate) result each time a window is
updated with a new record (regardless if the record is in-order or
late -- you will get a result record in both cases).

As of Kafka 0.10.1 those (intermediate) results get deduplicated so
you might not receive all (intermediate) results downstream. Of
course, it is ensured, that you will eventually get the latest/final
result sent downstream.


- -Matthias

On 10/21/16 7:42 AM, Hamza HACHANI wrote:
> Hi,
>
>
> I would like to process data based on a customised event time.(a
> timestamp that I implement as part of the message).
>
> The data is processed in periodic windows of x time that are
> parametered via the method punctuate.
>
> What I need is a retention time for the window to be able to treat
> the late arriving messages.
>
> Can I do this : define/configure a retention time for windows . For
> example the window which treat data between 15pm and 16pm forward
> the result not in 16pm but in 16:15 pm.
>
> Thanks in advance for your help.
>
>
> Hamza
>
>
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYDDi9AAoJECnhiMLycopPhR8P+wVnRiRwuM7omj3l4EfBVP9Q
7Vflwzf9TFmCjmtpSkezQVNSOhU7maUEB7R/4rjqacLuALJBPrOZ+mrRfYIx/oJd
E7oc3JXO1dlANMyKQ8SQ9dkXWxkeyNtl9ujkcFhMBejqtamteVaI/iNdUdbs6CCk
AZG86yQKhhPL2Q+BZK0eu88bc0yB5Kwj86uCFFkzMaMXVkvCY1f/kkplRGADsSfd
dfv5LONFzXmJ5dftC1giMis4Pg/kJU9gwvmh+WJB2tSM8KjXf1hZWe4nv1f+af8g
pVLnyepv7i7I6PwX/wSdtY2uSLKjizLSbvR3JsfAqLWSVBMgnPpHjCvGecR8K3IC
FwsnWUk9QJKn4EoahsfRq+afUYva8HRxNH7Aiq+gXvEx9LPgwCqbOGYy4ZnOaoi5
PycddoNaK1QEn3WYSxN88kLWFx5+vem4LWiBSDBT+Px0p14ZgKwEjqNgyj4fxBtw
SRsrxVvXXcj2qWvjtRCmTYpn7hanAbYT0QoNfIPs20/Jrh3qsaVJIt5xzRuYyxC8
aobYLlRFLL82eYaPcdQNz/1T0Q7kT9NbEabiuADaZIz7X1yQluMWuTaGIDk/6j6v
A2OqaJd8+KkMMOy9PUX1rBnJhJPZ4CTvkzmjnY10HU7sneNpBvU0tfnPdBBCU15B
8bsUiTf0lk6t9amGYT6q
=PcW7
-----END PGP SIGNATURE-----

RE: customised event time

Posted by Hamza HACHANI <ha...@supcom.tn>.
To be more specific.

What id do really need is the property of the retention time dor the window in the processor API.

Because for the window  i think that i can manage to do this.


Hamza

________________________________
De : Hamza HACHANI <ha...@supcom.tn>
Envoyé : dimanche 23 octobre 2016 20:30:13
À : users@kafka.apache.org
Objet : RE: customised event time

Hi,

I think that maybe i'm asking much.

But Ineed the aspect of windowing in the processor API not in the Stram DSL. Is this possible?

The second question is how can i get rid of the intermediate results because i'm only interested in the final result given by the window.

Hamza

________________________________
De : Matthias J. Sax <ma...@confluent.io>
Envoyé : samedi 22 octobre 2016 16:12:45
À : users@kafka.apache.org
Objet : Re: customised event time

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Hi,

you can set window retention time via method Windows#until() (and this
retention time is based on the timestamps returned from you custom
timestamp extractor). This keeps all windows until the retention time
passes and thus, all later arrival records will be processed correctly.

However, Kafka Streams does not close windows as other framework, but
rather gives you an (intermediate) result each time a window is
updated with a new record (regardless if the record is in-order or
late -- you will get a result record in both cases).

As of Kafka 0.10.1 those (intermediate) results get deduplicated so
you might not receive all (intermediate) results downstream. Of
course, it is ensured, that you will eventually get the latest/final
result sent downstream.


- -Matthias

On 10/21/16 7:42 AM, Hamza HACHANI wrote:
> Hi,
>
>
> I would like to process data based on a customised event time.(a
> timestamp that I implement as part of the message).
>
> The data is processed in periodic windows of x time that are
> parametered via the method punctuate.
>
> What I need is a retention time for the window to be able to treat
> the late arriving messages.
>
> Can I do this : define/configure a retention time for windows . For
> example the window which treat data between 15pm and 16pm forward
> the result not in 16pm but in 16:15 pm.
>
> Thanks in advance for your help.
>
>
> Hamza
>
>
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYDDi9AAoJECnhiMLycopPhR8P+wVnRiRwuM7omj3l4EfBVP9Q
7Vflwzf9TFmCjmtpSkezQVNSOhU7maUEB7R/4rjqacLuALJBPrOZ+mrRfYIx/oJd
E7oc3JXO1dlANMyKQ8SQ9dkXWxkeyNtl9ujkcFhMBejqtamteVaI/iNdUdbs6CCk
AZG86yQKhhPL2Q+BZK0eu88bc0yB5Kwj86uCFFkzMaMXVkvCY1f/kkplRGADsSfd
dfv5LONFzXmJ5dftC1giMis4Pg/kJU9gwvmh+WJB2tSM8KjXf1hZWe4nv1f+af8g
pVLnyepv7i7I6PwX/wSdtY2uSLKjizLSbvR3JsfAqLWSVBMgnPpHjCvGecR8K3IC
FwsnWUk9QJKn4EoahsfRq+afUYva8HRxNH7Aiq+gXvEx9LPgwCqbOGYy4ZnOaoi5
PycddoNaK1QEn3WYSxN88kLWFx5+vem4LWiBSDBT+Px0p14ZgKwEjqNgyj4fxBtw
SRsrxVvXXcj2qWvjtRCmTYpn7hanAbYT0QoNfIPs20/Jrh3qsaVJIt5xzRuYyxC8
aobYLlRFLL82eYaPcdQNz/1T0Q7kT9NbEabiuADaZIz7X1yQluMWuTaGIDk/6j6v
A2OqaJd8+KkMMOy9PUX1rBnJhJPZ4CTvkzmjnY10HU7sneNpBvU0tfnPdBBCU15B
8bsUiTf0lk6t9amGYT6q
=PcW7
-----END PGP SIGNATURE-----

RE: customised event time

Posted by Hamza HACHANI <ha...@supcom.tn>.
Hi,

I think that maybe i'm asking much.

But Ineed the aspect of windowing in the processor API not in the Stram DSL. Is this possible?

The second question is how can i get rid of the intermediate results because i'm only interested in the final result given by the window.

Hamza

________________________________
De : Matthias J. Sax <ma...@confluent.io>
Envoyé : samedi 22 octobre 2016 16:12:45
À : users@kafka.apache.org
Objet : Re: customised event time

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Hi,

you can set window retention time via method Windows#until() (and this
retention time is based on the timestamps returned from you custom
timestamp extractor). This keeps all windows until the retention time
passes and thus, all later arrival records will be processed correctly.

However, Kafka Streams does not close windows as other framework, but
rather gives you an (intermediate) result each time a window is
updated with a new record (regardless if the record is in-order or
late -- you will get a result record in both cases).

As of Kafka 0.10.1 those (intermediate) results get deduplicated so
you might not receive all (intermediate) results downstream. Of
course, it is ensured, that you will eventually get the latest/final
result sent downstream.


- -Matthias

On 10/21/16 7:42 AM, Hamza HACHANI wrote:
> Hi,
>
>
> I would like to process data based on a customised event time.(a
> timestamp that I implement as part of the message).
>
> The data is processed in periodic windows of x time that are
> parametered via the method punctuate.
>
> What I need is a retention time for the window to be able to treat
> the late arriving messages.
>
> Can I do this : define/configure a retention time for windows . For
> example the window which treat data between 15pm and 16pm forward
> the result not in 16pm but in 16:15 pm.
>
> Thanks in advance for your help.
>
>
> Hamza
>
>
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYDDi9AAoJECnhiMLycopPhR8P+wVnRiRwuM7omj3l4EfBVP9Q
7Vflwzf9TFmCjmtpSkezQVNSOhU7maUEB7R/4rjqacLuALJBPrOZ+mrRfYIx/oJd
E7oc3JXO1dlANMyKQ8SQ9dkXWxkeyNtl9ujkcFhMBejqtamteVaI/iNdUdbs6CCk
AZG86yQKhhPL2Q+BZK0eu88bc0yB5Kwj86uCFFkzMaMXVkvCY1f/kkplRGADsSfd
dfv5LONFzXmJ5dftC1giMis4Pg/kJU9gwvmh+WJB2tSM8KjXf1hZWe4nv1f+af8g
pVLnyepv7i7I6PwX/wSdtY2uSLKjizLSbvR3JsfAqLWSVBMgnPpHjCvGecR8K3IC
FwsnWUk9QJKn4EoahsfRq+afUYva8HRxNH7Aiq+gXvEx9LPgwCqbOGYy4ZnOaoi5
PycddoNaK1QEn3WYSxN88kLWFx5+vem4LWiBSDBT+Px0p14ZgKwEjqNgyj4fxBtw
SRsrxVvXXcj2qWvjtRCmTYpn7hanAbYT0QoNfIPs20/Jrh3qsaVJIt5xzRuYyxC8
aobYLlRFLL82eYaPcdQNz/1T0Q7kT9NbEabiuADaZIz7X1yQluMWuTaGIDk/6j6v
A2OqaJd8+KkMMOy9PUX1rBnJhJPZ4CTvkzmjnY10HU7sneNpBvU0tfnPdBBCU15B
8bsUiTf0lk6t9amGYT6q
=PcW7
-----END PGP SIGNATURE-----

Re: customised event time

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Hi,

you can set window retention time via method Windows#until() (and this
retention time is based on the timestamps returned from you custom
timestamp extractor). This keeps all windows until the retention time
passes and thus, all later arrival records will be processed correctly.

However, Kafka Streams does not close windows as other framework, but
rather gives you an (intermediate) result each time a window is
updated with a new record (regardless if the record is in-order or
late -- you will get a result record in both cases).

As of Kafka 0.10.1 those (intermediate) results get deduplicated so
you might not receive all (intermediate) results downstream. Of
course, it is ensured, that you will eventually get the latest/final
result sent downstream.


- -Matthias

On 10/21/16 7:42 AM, Hamza HACHANI wrote:
> Hi,
> 
> 
> I would like to process data based on a customised event time.(a
> timestamp that I implement as part of the message).
> 
> The data is processed in periodic windows of x time that are
> parametered via the method punctuate.
> 
> What I need is a retention time for the window to be able to treat
> the late arriving messages.
> 
> Can I do this : define/configure a retention time for windows . For
> example the window which treat data between 15pm and 16pm forward
> the result not in 16pm but in 16:15 pm.
> 
> Thanks in advance for your help.
> 
> 
> Hamza
> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYDDi9AAoJECnhiMLycopPhR8P+wVnRiRwuM7omj3l4EfBVP9Q
7Vflwzf9TFmCjmtpSkezQVNSOhU7maUEB7R/4rjqacLuALJBPrOZ+mrRfYIx/oJd
E7oc3JXO1dlANMyKQ8SQ9dkXWxkeyNtl9ujkcFhMBejqtamteVaI/iNdUdbs6CCk
AZG86yQKhhPL2Q+BZK0eu88bc0yB5Kwj86uCFFkzMaMXVkvCY1f/kkplRGADsSfd
dfv5LONFzXmJ5dftC1giMis4Pg/kJU9gwvmh+WJB2tSM8KjXf1hZWe4nv1f+af8g
pVLnyepv7i7I6PwX/wSdtY2uSLKjizLSbvR3JsfAqLWSVBMgnPpHjCvGecR8K3IC
FwsnWUk9QJKn4EoahsfRq+afUYva8HRxNH7Aiq+gXvEx9LPgwCqbOGYy4ZnOaoi5
PycddoNaK1QEn3WYSxN88kLWFx5+vem4LWiBSDBT+Px0p14ZgKwEjqNgyj4fxBtw
SRsrxVvXXcj2qWvjtRCmTYpn7hanAbYT0QoNfIPs20/Jrh3qsaVJIt5xzRuYyxC8
aobYLlRFLL82eYaPcdQNz/1T0Q7kT9NbEabiuADaZIz7X1yQluMWuTaGIDk/6j6v
A2OqaJd8+KkMMOy9PUX1rBnJhJPZ4CTvkzmjnY10HU7sneNpBvU0tfnPdBBCU15B
8bsUiTf0lk6t9amGYT6q
=PcW7
-----END PGP SIGNATURE-----