You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Peter Sinoros Szabo <Pe...@hu.ibm.com> on 2017/05/12 15:59:30 UTC

Order of punctuate() and process() in a stream processor

Hi,


Let's assume the following case.
- a stream processor that uses the Processor API
- context.schedule(1000) is called in the init()
- the processor reads only one topic that has one partition
- using custom timestamp extractor, but that timestamp is just a wall 
clock time


Image the following events:
1., for 10 seconds I send in 5 messages / second
2., does not send any messages for 3 seconds
3., starts the 5 messages / second again

I see that punctuate() is not called during the 3 seconds when I do not 
send any messages. This is ok according to the documentation, because 
there is not any new messages to trigger the punctuate() call. When the 
first few messages arrives after a restart the sending (point 3. above) I 
see the following sequence of method calls:

1., process() on the 1st message
2., punctuate() is called 3 times
3., process() on the 2nd message
4., process() on each following message

What I would expect instead is that punctuate() is called first and then 
process() is called on the messages, because the first message's timestamp 
is already 3 seconds older then the last punctuate() was called, so the 
first message belongs after the 3 punctuate() calls.

Please let me know if this is a bug or intentional, in this case what is 
the reason for processing one message before punctuate() is called?


Thanks,
Peter

Péter Sinóros-Szabó
Software Engineer

Ustream, an IBM Company
Andrassy ut 39, H-1061 Budapest
Mobile: +36203693050
Email: peter.sinoros-szabo@hu.ibm.com


Re: Order of punctuate() and process() in a stream processor

Posted by Mahendra Kariya <ma...@go-jek.com>.
We use Kafka Streams for quite a few aggregation tasks. For instance,
counting the number of messages with a particular status in a 1-minute time
window.

We have noticed that whenever we restart a stream, we see a sudden spike in
the aggregated numbers. After a few minutes, things are back to normal.
Could the above discussion be the reason for this?

Please note that we use custom timestamp extractor.



On Fri, May 12, 2017 at 11:24 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> I added the feedback to https://issues.apache.org/jira/browse/KAFKA-3514
>
> -Matthias
>
>
> On 5/12/17 10:38 AM, Thomas Becker wrote:
> > Thanks. I think the system time based punctuation scheme we were
> discussing would not result in repeated punctuations like this, but even
> using stream time it seems a bit odd. If you do anything in a punctuate
> call that is relatively expensive it's especially bad.
> >
> > ________________________________________
> > From: Matthias J. Sax [matthias@confluent.io]
> > Sent: Friday, May 12, 2017 1:18 PM
> > To: users@kafka.apache.org
> > Subject: Re: Order of punctuate() and process() in a stream processor
> >
> > Thanks for sharing.
> >
> > As punctuate is called with "streams time" you see the same time value
> > multiple times. It's again due to the coarse grained advance of "stream
> > time".
> >
> > @Thomas: I think, the way we handle it just simplifies the
> > implementation of punctuations. I don't see any other "advantage".
> >
> >
> > I will create a JIRA to track this -- we are currently working on some
> > improvements of punctuation and time management already, and it seems to
> > be another valuable improvement.
> >
> >
> > -Matthias
> >
> >
> > On 5/12/17 10:07 AM, Peter Sinoros Szabo wrote:
> >> Well, this is also a good question, because it is triggered with the
> same
> >> timestamp 3 times, so in order to create my update for both three
> seconds,
> >> I will have to count the number of punctuations and calculate the missed
> >> stream times for myself. It's ok for me to trigger it 3 times, but the
> >> timestamp should not be the same in each, but should be increased by the
> >> schedule time in each punctuate.
> >>
> >> - Sini
> >>
> >>
> >>
> >> From:   Thomas Becker <to...@Tivo.com>
> >> To:     "users@kafka.apache.org" <us...@kafka.apache.org>
> >> Date:   2017/05/12 18:57
> >> Subject:        RE: Order of punctuate() and process() in a stream
> >> processor
> >>
> >>
> >>
> >> I'm a bit troubled by the fact that it fires 3 times despite the stream
> >> time being advanced all at once; is there a scenario when this is
> >> beneficial?
> >>
> >> ________________________________________
> >> From: Matthias J. Sax [matthias@confluent.io]
> >> Sent: Friday, May 12, 2017 12:38 PM
> >> To: users@kafka.apache.org
> >> Subject: Re: Order of punctuate() and process() in a stream processor
> >>
> >> Hi Peter,
> >>
> >> It's by design. Streams internally tracks time progress (so-called
> >> "streams time"). "streams time" get advanced *after* processing a
> record.
> >>
> >> Thus, in your case, "stream time" is still at its old value before it
> >> processed the first message of you send "burst". After that, "streams
> >> time" is advanced by 3 seconds, and thus, punctuate fires 3 time.
> >>
> >> I guess, we could change the design and include scheduled punctuations
> >> when advancing "streams time". But atm, we just don't do this.
> >>
> >> Does this make sense?
> >>
> >> Is this critical for your use case? Or do you just want to understand
> >> what's happening?
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 5/12/17 8:59 AM, Peter Sinoros Szabo wrote:
> >>> Hi,
> >>>
> >>>
> >>> Let's assume the following case.
> >>> - a stream processor that uses the Processor API
> >>> - context.schedule(1000) is called in the init()
> >>> - the processor reads only one topic that has one partition
> >>> - using custom timestamp extractor, but that timestamp is just a wall
> >>> clock time
> >>>
> >>>
> >>> Image the following events:
> >>> 1., for 10 seconds I send in 5 messages / second
> >>> 2., does not send any messages for 3 seconds
> >>> 3., starts the 5 messages / second again
> >>>
> >>> I see that punctuate() is not called during the 3 seconds when I do not
> >>> send any messages. This is ok according to the documentation, because
> >>> there is not any new messages to trigger the punctuate() call. When the
> >>> first few messages arrives after a restart the sending (point 3. above)
> >> I
> >>> see the following sequence of method calls:
> >>>
> >>> 1., process() on the 1st message
> >>> 2., punctuate() is called 3 times
> >>> 3., process() on the 2nd message
> >>> 4., process() on each following message
> >>>
> >>> What I would expect instead is that punctuate() is called first and
> then
> >>> process() is called on the messages, because the first message's
> >> timestamp
> >>> is already 3 seconds older then the last punctuate() was called, so the
> >>> first message belongs after the 3 punctuate() calls.
> >>>
> >>> Please let me know if this is a bug or intentional, in this case what
> is
> >>> the reason for processing one message before punctuate() is called?
> >>>
> >>>
> >>> Thanks,
> >>> Peter
> >>>
> >>> Péter Sinóros-Szabó
> >>> Software Engineer
> >>>
> >>> Ustream, an IBM Company
> >>> Andrassy ut 39, H-1061 Budapest
> >>> Mobile: +36203693050
> >>> Email: peter.sinoros-szabo@hu.ibm.com
> >>>
> >>
> >> ________________________________
> >>
> >> This email and any attachments may contain confidential and privileged
> >> material for the sole use of the intended recipient. Any review,
> copying,
> >> or distribution of this email (or any attachments) by others is
> >> prohibited. If you are not the intended recipient, please contact the
> >> sender immediately and permanently delete this email and any
> attachments.
> >> No employee or agent of TiVo Inc. is authorized to conclude any binding
> >> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> >> Inc. may only be made by a signed written agreement.
> >>
> >>
> >>
> >>
> >>
> >
> > ________________________________
> >
> > This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
> >
>
>

Re: Order of punctuate() and process() in a stream processor

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I added the feedback to https://issues.apache.org/jira/browse/KAFKA-3514

-Matthias


On 5/12/17 10:38 AM, Thomas Becker wrote:
> Thanks. I think the system time based punctuation scheme we were discussing would not result in repeated punctuations like this, but even using stream time it seems a bit odd. If you do anything in a punctuate call that is relatively expensive it's especially bad.
> 
> ________________________________________
> From: Matthias J. Sax [matthias@confluent.io]
> Sent: Friday, May 12, 2017 1:18 PM
> To: users@kafka.apache.org
> Subject: Re: Order of punctuate() and process() in a stream processor
> 
> Thanks for sharing.
> 
> As punctuate is called with "streams time" you see the same time value
> multiple times. It's again due to the coarse grained advance of "stream
> time".
> 
> @Thomas: I think, the way we handle it just simplifies the
> implementation of punctuations. I don't see any other "advantage".
> 
> 
> I will create a JIRA to track this -- we are currently working on some
> improvements of punctuation and time management already, and it seems to
> be another valuable improvement.
> 
> 
> -Matthias
> 
> 
> On 5/12/17 10:07 AM, Peter Sinoros Szabo wrote:
>> Well, this is also a good question, because it is triggered with the same
>> timestamp 3 times, so in order to create my update for both three seconds,
>> I will have to count the number of punctuations and calculate the missed
>> stream times for myself. It's ok for me to trigger it 3 times, but the
>> timestamp should not be the same in each, but should be increased by the
>> schedule time in each punctuate.
>>
>> - Sini
>>
>>
>>
>> From:   Thomas Becker <to...@Tivo.com>
>> To:     "users@kafka.apache.org" <us...@kafka.apache.org>
>> Date:   2017/05/12 18:57
>> Subject:        RE: Order of punctuate() and process() in a stream
>> processor
>>
>>
>>
>> I'm a bit troubled by the fact that it fires 3 times despite the stream
>> time being advanced all at once; is there a scenario when this is
>> beneficial?
>>
>> ________________________________________
>> From: Matthias J. Sax [matthias@confluent.io]
>> Sent: Friday, May 12, 2017 12:38 PM
>> To: users@kafka.apache.org
>> Subject: Re: Order of punctuate() and process() in a stream processor
>>
>> Hi Peter,
>>
>> It's by design. Streams internally tracks time progress (so-called
>> "streams time"). "streams time" get advanced *after* processing a record.
>>
>> Thus, in your case, "stream time" is still at its old value before it
>> processed the first message of you send "burst". After that, "streams
>> time" is advanced by 3 seconds, and thus, punctuate fires 3 time.
>>
>> I guess, we could change the design and include scheduled punctuations
>> when advancing "streams time". But atm, we just don't do this.
>>
>> Does this make sense?
>>
>> Is this critical for your use case? Or do you just want to understand
>> what's happening?
>>
>>
>> -Matthias
>>
>>
>> On 5/12/17 8:59 AM, Peter Sinoros Szabo wrote:
>>> Hi,
>>>
>>>
>>> Let's assume the following case.
>>> - a stream processor that uses the Processor API
>>> - context.schedule(1000) is called in the init()
>>> - the processor reads only one topic that has one partition
>>> - using custom timestamp extractor, but that timestamp is just a wall
>>> clock time
>>>
>>>
>>> Image the following events:
>>> 1., for 10 seconds I send in 5 messages / second
>>> 2., does not send any messages for 3 seconds
>>> 3., starts the 5 messages / second again
>>>
>>> I see that punctuate() is not called during the 3 seconds when I do not
>>> send any messages. This is ok according to the documentation, because
>>> there is not any new messages to trigger the punctuate() call. When the
>>> first few messages arrives after a restart the sending (point 3. above)
>> I
>>> see the following sequence of method calls:
>>>
>>> 1., process() on the 1st message
>>> 2., punctuate() is called 3 times
>>> 3., process() on the 2nd message
>>> 4., process() on each following message
>>>
>>> What I would expect instead is that punctuate() is called first and then
>>> process() is called on the messages, because the first message's
>> timestamp
>>> is already 3 seconds older then the last punctuate() was called, so the
>>> first message belongs after the 3 punctuate() calls.
>>>
>>> Please let me know if this is a bug or intentional, in this case what is
>>> the reason for processing one message before punctuate() is called?
>>>
>>>
>>> Thanks,
>>> Peter
>>>
>>> Péter Sinóros-Szabó
>>> Software Engineer
>>>
>>> Ustream, an IBM Company
>>> Andrassy ut 39, H-1061 Budapest
>>> Mobile: +36203693050
>>> Email: peter.sinoros-szabo@hu.ibm.com
>>>
>>
>> ________________________________
>>
>> This email and any attachments may contain confidential and privileged
>> material for the sole use of the intended recipient. Any review, copying,
>> or distribution of this email (or any attachments) by others is
>> prohibited. If you are not the intended recipient, please contact the
>> sender immediately and permanently delete this email and any attachments.
>> No employee or agent of TiVo Inc. is authorized to conclude any binding
>> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
>> Inc. may only be made by a signed written agreement.
>>
>>
>>
>>
>>
> 
> ________________________________
> 
> This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.
> 


RE: Order of punctuate() and process() in a stream processor

Posted by Thomas Becker <to...@Tivo.com>.
Thanks. I think the system time based punctuation scheme we were discussing would not result in repeated punctuations like this, but even using stream time it seems a bit odd. If you do anything in a punctuate call that is relatively expensive it's especially bad.

________________________________________
From: Matthias J. Sax [matthias@confluent.io]
Sent: Friday, May 12, 2017 1:18 PM
To: users@kafka.apache.org
Subject: Re: Order of punctuate() and process() in a stream processor

Thanks for sharing.

As punctuate is called with "streams time" you see the same time value
multiple times. It's again due to the coarse grained advance of "stream
time".

@Thomas: I think, the way we handle it just simplifies the
implementation of punctuations. I don't see any other "advantage".


I will create a JIRA to track this -- we are currently working on some
improvements of punctuation and time management already, and it seems to
be another valuable improvement.


-Matthias


On 5/12/17 10:07 AM, Peter Sinoros Szabo wrote:
> Well, this is also a good question, because it is triggered with the same
> timestamp 3 times, so in order to create my update for both three seconds,
> I will have to count the number of punctuations and calculate the missed
> stream times for myself. It's ok for me to trigger it 3 times, but the
> timestamp should not be the same in each, but should be increased by the
> schedule time in each punctuate.
>
> - Sini
>
>
>
> From:   Thomas Becker <to...@Tivo.com>
> To:     "users@kafka.apache.org" <us...@kafka.apache.org>
> Date:   2017/05/12 18:57
> Subject:        RE: Order of punctuate() and process() in a stream
> processor
>
>
>
> I'm a bit troubled by the fact that it fires 3 times despite the stream
> time being advanced all at once; is there a scenario when this is
> beneficial?
>
> ________________________________________
> From: Matthias J. Sax [matthias@confluent.io]
> Sent: Friday, May 12, 2017 12:38 PM
> To: users@kafka.apache.org
> Subject: Re: Order of punctuate() and process() in a stream processor
>
> Hi Peter,
>
> It's by design. Streams internally tracks time progress (so-called
> "streams time"). "streams time" get advanced *after* processing a record.
>
> Thus, in your case, "stream time" is still at its old value before it
> processed the first message of you send "burst". After that, "streams
> time" is advanced by 3 seconds, and thus, punctuate fires 3 time.
>
> I guess, we could change the design and include scheduled punctuations
> when advancing "streams time". But atm, we just don't do this.
>
> Does this make sense?
>
> Is this critical for your use case? Or do you just want to understand
> what's happening?
>
>
> -Matthias
>
>
> On 5/12/17 8:59 AM, Peter Sinoros Szabo wrote:
>> Hi,
>>
>>
>> Let's assume the following case.
>> - a stream processor that uses the Processor API
>> - context.schedule(1000) is called in the init()
>> - the processor reads only one topic that has one partition
>> - using custom timestamp extractor, but that timestamp is just a wall
>> clock time
>>
>>
>> Image the following events:
>> 1., for 10 seconds I send in 5 messages / second
>> 2., does not send any messages for 3 seconds
>> 3., starts the 5 messages / second again
>>
>> I see that punctuate() is not called during the 3 seconds when I do not
>> send any messages. This is ok according to the documentation, because
>> there is not any new messages to trigger the punctuate() call. When the
>> first few messages arrives after a restart the sending (point 3. above)
> I
>> see the following sequence of method calls:
>>
>> 1., process() on the 1st message
>> 2., punctuate() is called 3 times
>> 3., process() on the 2nd message
>> 4., process() on each following message
>>
>> What I would expect instead is that punctuate() is called first and then
>> process() is called on the messages, because the first message's
> timestamp
>> is already 3 seconds older then the last punctuate() was called, so the
>> first message belongs after the 3 punctuate() calls.
>>
>> Please let me know if this is a bug or intentional, in this case what is
>> the reason for processing one message before punctuate() is called?
>>
>>
>> Thanks,
>> Peter
>>
>> Péter Sinóros-Szabó
>> Software Engineer
>>
>> Ustream, an IBM Company
>> Andrassy ut 39, H-1061 Budapest
>> Mobile: +36203693050
>> Email: peter.sinoros-szabo@hu.ibm.com
>>
>
> ________________________________
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is
> prohibited. If you are not the intended recipient, please contact the
> sender immediately and permanently delete this email and any attachments.
> No employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>
>
>
>
>

________________________________

This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.

Re: Order of punctuate() and process() in a stream processor

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks Sini!

I intended to create a new JIRA, but than changed my mind and just
picky-backed it to the existing one, as it's highly related and we might
be able to tackle it in one effort.


-Matthias

On 5/18/17 12:12 AM, Peter Sinoros Szabo wrote:
> Hi Michal,
> 
> yes, I know its beyond the scope of KIP-138, but from previous messages 
> from Matthias I thought that he will create a new ticket, but it seems 
> that instead he added it to KAFKA-3514. I will update that ticket with my 
> thoughts.
> 
> Thanks,
> Sini
> 
> 
> 
> From:   Michal Borowiecki <mi...@openbet.com>
> To:     users@kafka.apache.org
> Date:   2017/05/17 10:15
> Subject:        Re: Order of punctuate() and process() in a stream 
> processor
> 
> 
> 
> Hi Sini, 
> 
> This is beyond the score of KIP-138 but 
> https://issues.apache.org/jira/browse/KAFKA-3514 exists to track such 
> improvements
> 
> Thanks, 
> 
> Michal
> 
> On 17 May 2017 5:10 p.m., Peter Sinoros Szabo 
> <Pe...@hu.ibm.com> wrote:
> 
> Hi,
> 
> I see, now its clear why the repeated punctuations use the same time value 
> 
> in that case.
> 
> Do you have a JIRA ticket to track improvement ideas for that?
> 
> It would be great to have an option to:
> - advance the stream time before calling the process() on a new record  - 
> this would prevent to process a message in the wrong punctuation 
> "segment".
> - use fine grained advance of stream time for the "missed" punctuations  - 
> 
> this would ease the processing of burst messages after some silence. I do 
> not see if KIP-138 may solve this or not.
> 
> Regards
> 
> -Sini
> 
> 
> 
> From:   "Matthias J. Sax" <ma...@confluent.io>
> To:     users@kafka.apache.org
> Date:   2017/05/12 19:19
> Subject:        Re: Order of punctuate() and process() in a stream 
> processor
> 
> 
> 
> Thanks for sharing.
> 
> As punctuate is called with "streams time" you see the same time value
> multiple times. It's again due to the coarse grained advance of "stream
> time".
> 
> @Thomas: I think, the way we handle it just simplifies the
> implementation of punctuations. I don't see any other "advantage".
> 
> 
> I will create a JIRA to track this -- we are currently working on some
> improvements of punctuation and time management already, and it seems to
> be another valuable improvement.
> 
> 
> -Matthias
> 
> 
> On 5/12/17 10:07 AM, Peter Sinoros Szabo wrote:
>> Well, this is also a good question, because it is triggered with the 
> same 
>> timestamp 3 times, so in order to create my update for both three 
> seconds, 
>> I will have to count the number of punctuations and calculate the missed 
> 
> 
>> stream times for myself. It's ok for me to trigger it 3 times, but the 
>> timestamp should not be the same in each, but should be increased by the 
> 
> 
>> schedule time in each punctuate.
>>
>> - Sini
>>
>>
>>
>> From:   Thomas Becker <to...@Tivo.com>
>> To:     "users@kafka.apache.org" <us...@kafka.apache.org>
>> Date:   2017/05/12 18:57
>> Subject:        RE: Order of punctuate() and process() in a stream 
>> processor
>>
>>
>>
>> I'm a bit troubled by the fact that it fires 3 times despite the stream 
>> time being advanced all at once; is there a scenario when this is 
>> beneficial?
>>
>> ________________________________________
>> From: Matthias J. Sax [matthias@confluent.io]
>> Sent: Friday, May 12, 2017 12:38 PM
>> To: users@kafka.apache.org
>> Subject: Re: Order of punctuate() and process() in a stream processor
>>
>> Hi Peter,
>>
>> It's by design. Streams internally tracks time progress (so-called
>> "streams time"). "streams time" get advanced *after* processing a 
> record.
>>
>> Thus, in your case, "stream time" is still at its old value before it
>> processed the first message of you send "burst". After that, "streams
>> time" is advanced by 3 seconds, and thus, punctuate fires 3 time.
>>
>> I guess, we could change the design and include scheduled punctuations
>> when advancing "streams time". But atm, we just don't do this.
>>
>> Does this make sense?
>>
>> Is this critical for your use case? Or do you just want to understand
>> what's happening?
>>
>>
>> -Matthias
>>
>>
>> On 5/12/17 8:59 AM, Peter Sinoros Szabo wrote:
>>> Hi,
>>>
>>>
>>> Let's assume the following case.
>>> - a stream processor that uses the Processor API
>>> - context.schedule(1000) is called in the init()
>>> - the processor reads only one topic that has one partition
>>> - using custom timestamp extractor, but that timestamp is just a wall
>>> clock time
>>>
>>>
>>> Image the following events:
>>> 1., for 10 seconds I send in 5 messages / second
>>> 2., does not send any messages for 3 seconds
>>> 3., starts the 5 messages / second again
>>>
>>> I see that punctuate() is not called during the 3 seconds when I do not
>>> send any messages. This is ok according to the documentation, because
>>> there is not any new messages to trigger the punctuate() call. When the
>>> first few messages arrives after a restart the sending (point 3. above) 
> 
> 
>> I
>>> see the following sequence of method calls:
>>>
>>> 1., process() on the 1st message
>>> 2., punctuate() is called 3 times
>>> 3., process() on the 2nd message
>>> 4., process() on each following message
>>>
>>> What I would expect instead is that punctuate() is called first and 
> then
>>> process() is called on the messages, because the first message's 
>> timestamp
>>> is already 3 seconds older then the last punctuate() was called, so the
>>> first message belongs after the 3 punctuate() calls.
>>>
>>> Please let me know if this is a bug or intentional, in this case what 
> is
>>> the reason for processing one message before punctuate() is called?
>>>
>>>
>>> Thanks,
>>> Peter
>>>
>>> Péter Sinóros-Szabó
>>> Software Engineer
>>>
>>> Ustream, an IBM Company
>>> Andrassy ut 39, H-1061 Budapest
>>> Mobile: +36203693050
>>> Email: peter.sinoros-szabo@hu.ibm.com
>>>
>>
>> ________________________________
>>
>> This email and any attachments may contain confidential and privileged 
>> material for the sole use of the intended recipient. Any review, 
> copying, 
>> or distribution of this email (or any attachments) by others is 
>> prohibited. If you are not the intended recipient, please contact the 
>> sender immediately and permanently delete this email and any 
> attachments. 
>> No employee or agent of TiVo Inc. is authorized to conclude any binding 
>> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo 
>> Inc. may only be made by a signed written agreement.
>>
>>
>>
>>
>>
> 
> [attachment "signature.asc" deleted by Peter Sinoros Szabo/Hungary/IBM] 
> 
> 
> 
> 
> 
> 
> 


Re: Order of punctuate() and process() in a stream processor

Posted by Peter Sinoros Szabo <Pe...@hu.ibm.com>.
Hi Michal,

yes, I know its beyond the scope of KIP-138, but from previous messages 
from Matthias I thought that he will create a new ticket, but it seems 
that instead he added it to KAFKA-3514. I will update that ticket with my 
thoughts.

Thanks,
Sini



From:   Michal Borowiecki <mi...@openbet.com>
To:     users@kafka.apache.org
Date:   2017/05/17 10:15
Subject:        Re: Order of punctuate() and process() in a stream 
processor



Hi Sini, 

This is beyond the score of KIP-138 but 
https://issues.apache.org/jira/browse/KAFKA-3514 exists to track such 
improvements

Thanks, 

Michal

On 17 May 2017 5:10 p.m., Peter Sinoros Szabo 
<Pe...@hu.ibm.com> wrote:

Hi,

I see, now its clear why the repeated punctuations use the same time value 

in that case.

Do you have a JIRA ticket to track improvement ideas for that?

It would be great to have an option to:
- advance the stream time before calling the process() on a new record  - 
this would prevent to process a message in the wrong punctuation 
"segment".
- use fine grained advance of stream time for the "missed" punctuations  - 

this would ease the processing of burst messages after some silence. I do 
not see if KIP-138 may solve this or not.

Regards

-Sini



From:   "Matthias J. Sax" <ma...@confluent.io>
To:     users@kafka.apache.org
Date:   2017/05/12 19:19
Subject:        Re: Order of punctuate() and process() in a stream 
processor



Thanks for sharing.

As punctuate is called with "streams time" you see the same time value
multiple times. It's again due to the coarse grained advance of "stream
time".

@Thomas: I think, the way we handle it just simplifies the
implementation of punctuations. I don't see any other "advantage".


I will create a JIRA to track this -- we are currently working on some
improvements of punctuation and time management already, and it seems to
be another valuable improvement.


-Matthias


On 5/12/17 10:07 AM, Peter Sinoros Szabo wrote:
> Well, this is also a good question, because it is triggered with the 
same 
> timestamp 3 times, so in order to create my update for both three 
seconds, 
> I will have to count the number of punctuations and calculate the missed 


> stream times for myself. It's ok for me to trigger it 3 times, but the 
> timestamp should not be the same in each, but should be increased by the 


> schedule time in each punctuate.
> 
> - Sini
> 
> 
> 
> From:   Thomas Becker <to...@Tivo.com>
> To:     "users@kafka.apache.org" <us...@kafka.apache.org>
> Date:   2017/05/12 18:57
> Subject:        RE: Order of punctuate() and process() in a stream 
> processor
> 
> 
> 
> I'm a bit troubled by the fact that it fires 3 times despite the stream 
> time being advanced all at once; is there a scenario when this is 
> beneficial?
> 
> ________________________________________
> From: Matthias J. Sax [matthias@confluent.io]
> Sent: Friday, May 12, 2017 12:38 PM
> To: users@kafka.apache.org
> Subject: Re: Order of punctuate() and process() in a stream processor
> 
> Hi Peter,
> 
> It's by design. Streams internally tracks time progress (so-called
> "streams time"). "streams time" get advanced *after* processing a 
record.
> 
> Thus, in your case, "stream time" is still at its old value before it
> processed the first message of you send "burst". After that, "streams
> time" is advanced by 3 seconds, and thus, punctuate fires 3 time.
> 
> I guess, we could change the design and include scheduled punctuations
> when advancing "streams time". But atm, we just don't do this.
> 
> Does this make sense?
> 
> Is this critical for your use case? Or do you just want to understand
> what's happening?
> 
> 
> -Matthias
> 
> 
> On 5/12/17 8:59 AM, Peter Sinoros Szabo wrote:
>> Hi,
>>
>>
>> Let's assume the following case.
>> - a stream processor that uses the Processor API
>> - context.schedule(1000) is called in the init()
>> - the processor reads only one topic that has one partition
>> - using custom timestamp extractor, but that timestamp is just a wall
>> clock time
>>
>>
>> Image the following events:
>> 1., for 10 seconds I send in 5 messages / second
>> 2., does not send any messages for 3 seconds
>> 3., starts the 5 messages / second again
>>
>> I see that punctuate() is not called during the 3 seconds when I do not
>> send any messages. This is ok according to the documentation, because
>> there is not any new messages to trigger the punctuate() call. When the
>> first few messages arrives after a restart the sending (point 3. above) 


> I
>> see the following sequence of method calls:
>>
>> 1., process() on the 1st message
>> 2., punctuate() is called 3 times
>> 3., process() on the 2nd message
>> 4., process() on each following message
>>
>> What I would expect instead is that punctuate() is called first and 
then
>> process() is called on the messages, because the first message's 
> timestamp
>> is already 3 seconds older then the last punctuate() was called, so the
>> first message belongs after the 3 punctuate() calls.
>>
>> Please let me know if this is a bug or intentional, in this case what 
is
>> the reason for processing one message before punctuate() is called?
>>
>>
>> Thanks,
>> Peter
>>
>> Péter Sinóros-Szabó
>> Software Engineer
>>
>> Ustream, an IBM Company
>> Andrassy ut 39, H-1061 Budapest
>> Mobile: +36203693050
>> Email: peter.sinoros-szabo@hu.ibm.com
>>
> 
> ________________________________
> 
> This email and any attachments may contain confidential and privileged 
> material for the sole use of the intended recipient. Any review, 
copying, 
> or distribution of this email (or any attachments) by others is 
> prohibited. If you are not the intended recipient, please contact the 
> sender immediately and permanently delete this email and any 
attachments. 
> No employee or agent of TiVo Inc. is authorized to conclude any binding 
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo 
> Inc. may only be made by a signed written agreement.
> 
> 
> 
> 
> 

[attachment "signature.asc" deleted by Peter Sinoros Szabo/Hungary/IBM] 








Re: Order of punctuate() and process() in a stream processor

Posted by Michal Borowiecki <mi...@openbet.com>.
Hi Sini, 

This is beyond the score of KIP-138 but https://issues.apache.org/jira/browse/KAFKA-3514 exists to track such improvements

Thanks, 

Michal

On 17 May 2017 5:10 p.m., Peter Sinoros Szabo <Pe...@hu.ibm.com> wrote:

Hi,

I see, now its clear why the repeated punctuations use the same time value 
in that case.

Do you have a JIRA ticket to track improvement ideas for that?

It would be great to have an option to:
- advance the stream time before calling the process() on a new record  - 
this would prevent to process a message in the wrong punctuation 
"segment".
- use fine grained advance of stream time for the "missed" punctuations  - 
this would ease the processing of burst messages after some silence. I do 
not see if KIP-138 may solve this or not.

Regards

-Sini



From:   "Matthias J. Sax" <ma...@confluent.io>
To:     users@kafka.apache.org
Date:   2017/05/12 19:19
Subject:        Re: Order of punctuate() and process() in a stream 
processor



Thanks for sharing.

As punctuate is called with "streams time" you see the same time value
multiple times. It's again due to the coarse grained advance of "stream
time".

@Thomas: I think, the way we handle it just simplifies the
implementation of punctuations. I don't see any other "advantage".


I will create a JIRA to track this -- we are currently working on some
improvements of punctuation and time management already, and it seems to
be another valuable improvement.


-Matthias


On 5/12/17 10:07 AM, Peter Sinoros Szabo wrote:
> Well, this is also a good question, because it is triggered with the 
same 
> timestamp 3 times, so in order to create my update for both three 
seconds, 
> I will have to count the number of punctuations and calculate the missed 

> stream times for myself. It's ok for me to trigger it 3 times, but the 
> timestamp should not be the same in each, but should be increased by the 

> schedule time in each punctuate.
> 
> - Sini
> 
> 
> 
> From:   Thomas Becker <to...@Tivo.com>
> To:     "users@kafka.apache.org" <us...@kafka.apache.org>
> Date:   2017/05/12 18:57
> Subject:        RE: Order of punctuate() and process() in a stream 
> processor
> 
> 
> 
> I'm a bit troubled by the fact that it fires 3 times despite the stream 
> time being advanced all at once; is there a scenario when this is 
> beneficial?
> 
> ________________________________________
> From: Matthias J. Sax [matthias@confluent.io]
> Sent: Friday, May 12, 2017 12:38 PM
> To: users@kafka.apache.org
> Subject: Re: Order of punctuate() and process() in a stream processor
> 
> Hi Peter,
> 
> It's by design. Streams internally tracks time progress (so-called
> "streams time"). "streams time" get advanced *after* processing a 
record.
> 
> Thus, in your case, "stream time" is still at its old value before it
> processed the first message of you send "burst". After that, "streams
> time" is advanced by 3 seconds, and thus, punctuate fires 3 time.
> 
> I guess, we could change the design and include scheduled punctuations
> when advancing "streams time". But atm, we just don't do this.
> 
> Does this make sense?
> 
> Is this critical for your use case? Or do you just want to understand
> what's happening?
> 
> 
> -Matthias
> 
> 
> On 5/12/17 8:59 AM, Peter Sinoros Szabo wrote:
>> Hi,
>>
>>
>> Let's assume the following case.
>> - a stream processor that uses the Processor API
>> - context.schedule(1000) is called in the init()
>> - the processor reads only one topic that has one partition
>> - using custom timestamp extractor, but that timestamp is just a wall
>> clock time
>>
>>
>> Image the following events:
>> 1., for 10 seconds I send in 5 messages / second
>> 2., does not send any messages for 3 seconds
>> 3., starts the 5 messages / second again
>>
>> I see that punctuate() is not called during the 3 seconds when I do not
>> send any messages. This is ok according to the documentation, because
>> there is not any new messages to trigger the punctuate() call. When the
>> first few messages arrives after a restart the sending (point 3. above) 

> I
>> see the following sequence of method calls:
>>
>> 1., process() on the 1st message
>> 2., punctuate() is called 3 times
>> 3., process() on the 2nd message
>> 4., process() on each following message
>>
>> What I would expect instead is that punctuate() is called first and 
then
>> process() is called on the messages, because the first message's 
> timestamp
>> is already 3 seconds older then the last punctuate() was called, so the
>> first message belongs after the 3 punctuate() calls.
>>
>> Please let me know if this is a bug or intentional, in this case what 
is
>> the reason for processing one message before punctuate() is called?
>>
>>
>> Thanks,
>> Peter
>>
>> Péter Sinóros-Szabó
>> Software Engineer
>>
>> Ustream, an IBM Company
>> Andrassy ut 39, H-1061 Budapest
>> Mobile: +36203693050
>> Email: peter.sinoros-szabo@hu.ibm.com
>>
> 
> ________________________________
> 
> This email and any attachments may contain confidential and privileged 
> material for the sole use of the intended recipient. Any review, 
copying, 
> or distribution of this email (or any attachments) by others is 
> prohibited. If you are not the intended recipient, please contact the 
> sender immediately and permanently delete this email and any 
attachments. 
> No employee or agent of TiVo Inc. is authorized to conclude any binding 
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo 
> Inc. may only be made by a signed written agreement.
> 
> 
> 
> 
> 

[attachment "signature.asc" deleted by Peter Sinoros Szabo/Hungary/IBM] 




Re: Order of punctuate() and process() in a stream processor

Posted by Peter Sinoros Szabo <Pe...@hu.ibm.com>.
Hi,

I see, now its clear why the repeated punctuations use the same time value 
in that case.

Do you have a JIRA ticket to track improvement ideas for that?

It would be great to have an option to:
- advance the stream time before calling the process() on a new record  - 
this would prevent to process a message in the wrong punctuation 
"segment".
- use fine grained advance of stream time for the "missed" punctuations  - 
this would ease the processing of burst messages after some silence. I do 
not see if KIP-138 may solve this or not.

Regards

-Sini



From:   "Matthias J. Sax" <ma...@confluent.io>
To:     users@kafka.apache.org
Date:   2017/05/12 19:19
Subject:        Re: Order of punctuate() and process() in a stream 
processor



Thanks for sharing.

As punctuate is called with "streams time" you see the same time value
multiple times. It's again due to the coarse grained advance of "stream
time".

@Thomas: I think, the way we handle it just simplifies the
implementation of punctuations. I don't see any other "advantage".


I will create a JIRA to track this -- we are currently working on some
improvements of punctuation and time management already, and it seems to
be another valuable improvement.


-Matthias


On 5/12/17 10:07 AM, Peter Sinoros Szabo wrote:
> Well, this is also a good question, because it is triggered with the 
same 
> timestamp 3 times, so in order to create my update for both three 
seconds, 
> I will have to count the number of punctuations and calculate the missed 

> stream times for myself. It's ok for me to trigger it 3 times, but the 
> timestamp should not be the same in each, but should be increased by the 

> schedule time in each punctuate.
> 
> - Sini
> 
> 
> 
> From:   Thomas Becker <to...@Tivo.com>
> To:     "users@kafka.apache.org" <us...@kafka.apache.org>
> Date:   2017/05/12 18:57
> Subject:        RE: Order of punctuate() and process() in a stream 
> processor
> 
> 
> 
> I'm a bit troubled by the fact that it fires 3 times despite the stream 
> time being advanced all at once; is there a scenario when this is 
> beneficial?
> 
> ________________________________________
> From: Matthias J. Sax [matthias@confluent.io]
> Sent: Friday, May 12, 2017 12:38 PM
> To: users@kafka.apache.org
> Subject: Re: Order of punctuate() and process() in a stream processor
> 
> Hi Peter,
> 
> It's by design. Streams internally tracks time progress (so-called
> "streams time"). "streams time" get advanced *after* processing a 
record.
> 
> Thus, in your case, "stream time" is still at its old value before it
> processed the first message of you send "burst". After that, "streams
> time" is advanced by 3 seconds, and thus, punctuate fires 3 time.
> 
> I guess, we could change the design and include scheduled punctuations
> when advancing "streams time". But atm, we just don't do this.
> 
> Does this make sense?
> 
> Is this critical for your use case? Or do you just want to understand
> what's happening?
> 
> 
> -Matthias
> 
> 
> On 5/12/17 8:59 AM, Peter Sinoros Szabo wrote:
>> Hi,
>>
>>
>> Let's assume the following case.
>> - a stream processor that uses the Processor API
>> - context.schedule(1000) is called in the init()
>> - the processor reads only one topic that has one partition
>> - using custom timestamp extractor, but that timestamp is just a wall
>> clock time
>>
>>
>> Image the following events:
>> 1., for 10 seconds I send in 5 messages / second
>> 2., does not send any messages for 3 seconds
>> 3., starts the 5 messages / second again
>>
>> I see that punctuate() is not called during the 3 seconds when I do not
>> send any messages. This is ok according to the documentation, because
>> there is not any new messages to trigger the punctuate() call. When the
>> first few messages arrives after a restart the sending (point 3. above) 

> I
>> see the following sequence of method calls:
>>
>> 1., process() on the 1st message
>> 2., punctuate() is called 3 times
>> 3., process() on the 2nd message
>> 4., process() on each following message
>>
>> What I would expect instead is that punctuate() is called first and 
then
>> process() is called on the messages, because the first message's 
> timestamp
>> is already 3 seconds older then the last punctuate() was called, so the
>> first message belongs after the 3 punctuate() calls.
>>
>> Please let me know if this is a bug or intentional, in this case what 
is
>> the reason for processing one message before punctuate() is called?
>>
>>
>> Thanks,
>> Peter
>>
>> Péter Sinóros-Szabó
>> Software Engineer
>>
>> Ustream, an IBM Company
>> Andrassy ut 39, H-1061 Budapest
>> Mobile: +36203693050
>> Email: peter.sinoros-szabo@hu.ibm.com
>>
> 
> ________________________________
> 
> This email and any attachments may contain confidential and privileged 
> material for the sole use of the intended recipient. Any review, 
copying, 
> or distribution of this email (or any attachments) by others is 
> prohibited. If you are not the intended recipient, please contact the 
> sender immediately and permanently delete this email and any 
attachments. 
> No employee or agent of TiVo Inc. is authorized to conclude any binding 
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo 
> Inc. may only be made by a signed written agreement.
> 
> 
> 
> 
> 

[attachment "signature.asc" deleted by Peter Sinoros Szabo/Hungary/IBM] 




Re: Order of punctuate() and process() in a stream processor

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks for sharing.

As punctuate is called with "streams time" you see the same time value
multiple times. It's again due to the coarse grained advance of "stream
time".

@Thomas: I think, the way we handle it just simplifies the
implementation of punctuations. I don't see any other "advantage".


I will create a JIRA to track this -- we are currently working on some
improvements of punctuation and time management already, and it seems to
be another valuable improvement.


-Matthias


On 5/12/17 10:07 AM, Peter Sinoros Szabo wrote:
> Well, this is also a good question, because it is triggered with the same 
> timestamp 3 times, so in order to create my update for both three seconds, 
> I will have to count the number of punctuations and calculate the missed 
> stream times for myself. It's ok for me to trigger it 3 times, but the 
> timestamp should not be the same in each, but should be increased by the 
> schedule time in each punctuate.
> 
> - Sini
> 
> 
> 
> From:   Thomas Becker <to...@Tivo.com>
> To:     "users@kafka.apache.org" <us...@kafka.apache.org>
> Date:   2017/05/12 18:57
> Subject:        RE: Order of punctuate() and process() in a stream 
> processor
> 
> 
> 
> I'm a bit troubled by the fact that it fires 3 times despite the stream 
> time being advanced all at once; is there a scenario when this is 
> beneficial?
> 
> ________________________________________
> From: Matthias J. Sax [matthias@confluent.io]
> Sent: Friday, May 12, 2017 12:38 PM
> To: users@kafka.apache.org
> Subject: Re: Order of punctuate() and process() in a stream processor
> 
> Hi Peter,
> 
> It's by design. Streams internally tracks time progress (so-called
> "streams time"). "streams time" get advanced *after* processing a record.
> 
> Thus, in your case, "stream time" is still at its old value before it
> processed the first message of you send "burst". After that, "streams
> time" is advanced by 3 seconds, and thus, punctuate fires 3 time.
> 
> I guess, we could change the design and include scheduled punctuations
> when advancing "streams time". But atm, we just don't do this.
> 
> Does this make sense?
> 
> Is this critical for your use case? Or do you just want to understand
> what's happening?
> 
> 
> -Matthias
> 
> 
> On 5/12/17 8:59 AM, Peter Sinoros Szabo wrote:
>> Hi,
>>
>>
>> Let's assume the following case.
>> - a stream processor that uses the Processor API
>> - context.schedule(1000) is called in the init()
>> - the processor reads only one topic that has one partition
>> - using custom timestamp extractor, but that timestamp is just a wall
>> clock time
>>
>>
>> Image the following events:
>> 1., for 10 seconds I send in 5 messages / second
>> 2., does not send any messages for 3 seconds
>> 3., starts the 5 messages / second again
>>
>> I see that punctuate() is not called during the 3 seconds when I do not
>> send any messages. This is ok according to the documentation, because
>> there is not any new messages to trigger the punctuate() call. When the
>> first few messages arrives after a restart the sending (point 3. above) 
> I
>> see the following sequence of method calls:
>>
>> 1., process() on the 1st message
>> 2., punctuate() is called 3 times
>> 3., process() on the 2nd message
>> 4., process() on each following message
>>
>> What I would expect instead is that punctuate() is called first and then
>> process() is called on the messages, because the first message's 
> timestamp
>> is already 3 seconds older then the last punctuate() was called, so the
>> first message belongs after the 3 punctuate() calls.
>>
>> Please let me know if this is a bug or intentional, in this case what is
>> the reason for processing one message before punctuate() is called?
>>
>>
>> Thanks,
>> Peter
>>
>> Péter Sinóros-Szabó
>> Software Engineer
>>
>> Ustream, an IBM Company
>> Andrassy ut 39, H-1061 Budapest
>> Mobile: +36203693050
>> Email: peter.sinoros-szabo@hu.ibm.com
>>
> 
> ________________________________
> 
> This email and any attachments may contain confidential and privileged 
> material for the sole use of the intended recipient. Any review, copying, 
> or distribution of this email (or any attachments) by others is 
> prohibited. If you are not the intended recipient, please contact the 
> sender immediately and permanently delete this email and any attachments. 
> No employee or agent of TiVo Inc. is authorized to conclude any binding 
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo 
> Inc. may only be made by a signed written agreement.
> 
> 
> 
> 
> 


RE: Order of punctuate() and process() in a stream processor

Posted by Peter Sinoros Szabo <Pe...@hu.ibm.com>.
Well, this is also a good question, because it is triggered with the same 
timestamp 3 times, so in order to create my update for both three seconds, 
I will have to count the number of punctuations and calculate the missed 
stream times for myself. It's ok for me to trigger it 3 times, but the 
timestamp should not be the same in each, but should be increased by the 
schedule time in each punctuate.

- Sini



From:   Thomas Becker <to...@Tivo.com>
To:     "users@kafka.apache.org" <us...@kafka.apache.org>
Date:   2017/05/12 18:57
Subject:        RE: Order of punctuate() and process() in a stream 
processor



I'm a bit troubled by the fact that it fires 3 times despite the stream 
time being advanced all at once; is there a scenario when this is 
beneficial?

________________________________________
From: Matthias J. Sax [matthias@confluent.io]
Sent: Friday, May 12, 2017 12:38 PM
To: users@kafka.apache.org
Subject: Re: Order of punctuate() and process() in a stream processor

Hi Peter,

It's by design. Streams internally tracks time progress (so-called
"streams time"). "streams time" get advanced *after* processing a record.

Thus, in your case, "stream time" is still at its old value before it
processed the first message of you send "burst". After that, "streams
time" is advanced by 3 seconds, and thus, punctuate fires 3 time.

I guess, we could change the design and include scheduled punctuations
when advancing "streams time". But atm, we just don't do this.

Does this make sense?

Is this critical for your use case? Or do you just want to understand
what's happening?


-Matthias


On 5/12/17 8:59 AM, Peter Sinoros Szabo wrote:
> Hi,
>
>
> Let's assume the following case.
> - a stream processor that uses the Processor API
> - context.schedule(1000) is called in the init()
> - the processor reads only one topic that has one partition
> - using custom timestamp extractor, but that timestamp is just a wall
> clock time
>
>
> Image the following events:
> 1., for 10 seconds I send in 5 messages / second
> 2., does not send any messages for 3 seconds
> 3., starts the 5 messages / second again
>
> I see that punctuate() is not called during the 3 seconds when I do not
> send any messages. This is ok according to the documentation, because
> there is not any new messages to trigger the punctuate() call. When the
> first few messages arrives after a restart the sending (point 3. above) 
I
> see the following sequence of method calls:
>
> 1., process() on the 1st message
> 2., punctuate() is called 3 times
> 3., process() on the 2nd message
> 4., process() on each following message
>
> What I would expect instead is that punctuate() is called first and then
> process() is called on the messages, because the first message's 
timestamp
> is already 3 seconds older then the last punctuate() was called, so the
> first message belongs after the 3 punctuate() calls.
>
> Please let me know if this is a bug or intentional, in this case what is
> the reason for processing one message before punctuate() is called?
>
>
> Thanks,
> Peter
>
> Péter Sinóros-Szabó
> Software Engineer
>
> Ustream, an IBM Company
> Andrassy ut 39, H-1061 Budapest
> Mobile: +36203693050
> Email: peter.sinoros-szabo@hu.ibm.com
>

________________________________

This email and any attachments may contain confidential and privileged 
material for the sole use of the intended recipient. Any review, copying, 
or distribution of this email (or any attachments) by others is 
prohibited. If you are not the intended recipient, please contact the 
sender immediately and permanently delete this email and any attachments. 
No employee or agent of TiVo Inc. is authorized to conclude any binding 
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo 
Inc. may only be made by a signed written agreement.






RE: Order of punctuate() and process() in a stream processor

Posted by Thomas Becker <to...@Tivo.com>.
I'm a bit troubled by the fact that it fires 3 times despite the stream time being advanced all at once; is there a scenario when this is beneficial?

________________________________________
From: Matthias J. Sax [matthias@confluent.io]
Sent: Friday, May 12, 2017 12:38 PM
To: users@kafka.apache.org
Subject: Re: Order of punctuate() and process() in a stream processor

Hi Peter,

It's by design. Streams internally tracks time progress (so-called
"streams time"). "streams time" get advanced *after* processing a record.

Thus, in your case, "stream time" is still at its old value before it
processed the first message of you send "burst". After that, "streams
time" is advanced by 3 seconds, and thus, punctuate fires 3 time.

I guess, we could change the design and include scheduled punctuations
when advancing "streams time". But atm, we just don't do this.

Does this make sense?

Is this critical for your use case? Or do you just want to understand
what's happening?


-Matthias


On 5/12/17 8:59 AM, Peter Sinoros Szabo wrote:
> Hi,
>
>
> Let's assume the following case.
> - a stream processor that uses the Processor API
> - context.schedule(1000) is called in the init()
> - the processor reads only one topic that has one partition
> - using custom timestamp extractor, but that timestamp is just a wall
> clock time
>
>
> Image the following events:
> 1., for 10 seconds I send in 5 messages / second
> 2., does not send any messages for 3 seconds
> 3., starts the 5 messages / second again
>
> I see that punctuate() is not called during the 3 seconds when I do not
> send any messages. This is ok according to the documentation, because
> there is not any new messages to trigger the punctuate() call. When the
> first few messages arrives after a restart the sending (point 3. above) I
> see the following sequence of method calls:
>
> 1., process() on the 1st message
> 2., punctuate() is called 3 times
> 3., process() on the 2nd message
> 4., process() on each following message
>
> What I would expect instead is that punctuate() is called first and then
> process() is called on the messages, because the first message's timestamp
> is already 3 seconds older then the last punctuate() was called, so the
> first message belongs after the 3 punctuate() calls.
>
> Please let me know if this is a bug or intentional, in this case what is
> the reason for processing one message before punctuate() is called?
>
>
> Thanks,
> Peter
>
> Péter Sinóros-Szabó
> Software Engineer
>
> Ustream, an IBM Company
> Andrassy ut 39, H-1061 Budapest
> Mobile: +36203693050
> Email: peter.sinoros-szabo@hu.ibm.com
>

________________________________

This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.

Re: Order of punctuate() and process() in a stream processor

Posted by Peter Sinoros Szabo <Pe...@hu.ibm.com>.
Hi,

It is actually critical for me (and of course I would like to understand 
it too), because using this design the "first message" will be processed 
in a bad window/segment (I mean the timeframe between the punctuate() 
calls): it will be processed in the "3 seconds before segment" instead of 
in a later one. I can probably make a workaround for it with deferring its 
processing after punctuate fired 3 times, but it ugly :(

Is there a reason for advancing the "streams time" after processing a 
record and not before?

Thanks,
Peter



From:   "Matthias J. Sax" <ma...@confluent.io>
To:     users@kafka.apache.org
Date:   2017/05/12 18:39
Subject:        Re: Order of punctuate() and process() in a stream 
processor



Hi Peter,

It's by design. Streams internally tracks time progress (so-called
"streams time"). "streams time" get advanced *after* processing a record.

Thus, in your case, "stream time" is still at its old value before it
processed the first message of you send "burst". After that, "streams
time" is advanced by 3 seconds, and thus, punctuate fires 3 time.

I guess, we could change the design and include scheduled punctuations
when advancing "streams time". But atm, we just don't do this.

Does this make sense?

Is this critical for your use case? Or do you just want to understand
what's happening?


-Matthias


On 5/12/17 8:59 AM, Peter Sinoros Szabo wrote:
> Hi,
> 
> 
> Let's assume the following case.
> - a stream processor that uses the Processor API
> - context.schedule(1000) is called in the init()
> - the processor reads only one topic that has one partition
> - using custom timestamp extractor, but that timestamp is just a wall 
> clock time
> 
> 
> Image the following events:
> 1., for 10 seconds I send in 5 messages / second
> 2., does not send any messages for 3 seconds
> 3., starts the 5 messages / second again
> 
> I see that punctuate() is not called during the 3 seconds when I do not 
> send any messages. This is ok according to the documentation, because 
> there is not any new messages to trigger the punctuate() call. When the 
> first few messages arrives after a restart the sending (point 3. above) 
I 
> see the following sequence of method calls:
> 
> 1., process() on the 1st message
> 2., punctuate() is called 3 times
> 3., process() on the 2nd message
> 4., process() on each following message
> 
> What I would expect instead is that punctuate() is called first and then 

> process() is called on the messages, because the first message's 
timestamp 
> is already 3 seconds older then the last punctuate() was called, so the 
> first message belongs after the 3 punctuate() calls.
> 
> Please let me know if this is a bug or intentional, in this case what is 

> the reason for processing one message before punctuate() is called?
> 
> 
> Thanks,
> Peter
> 
> Péter Sinóros-Szabó
> Software Engineer
> 
> Ustream, an IBM Company
> Andrassy ut 39, H-1061 Budapest
> Mobile: +36203693050
> Email: peter.sinoros-szabo@hu.ibm.com
> 

[attachment "signature.asc" deleted by Peter Sinoros Szabo/Hungary/IBM] 




Re: Order of punctuate() and process() in a stream processor

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi Peter,

It's by design. Streams internally tracks time progress (so-called
"streams time"). "streams time" get advanced *after* processing a record.

Thus, in your case, "stream time" is still at its old value before it
processed the first message of you send "burst". After that, "streams
time" is advanced by 3 seconds, and thus, punctuate fires 3 time.

I guess, we could change the design and include scheduled punctuations
when advancing "streams time". But atm, we just don't do this.

Does this make sense?

Is this critical for your use case? Or do you just want to understand
what's happening?


-Matthias


On 5/12/17 8:59 AM, Peter Sinoros Szabo wrote:
> Hi,
> 
> 
> Let's assume the following case.
> - a stream processor that uses the Processor API
> - context.schedule(1000) is called in the init()
> - the processor reads only one topic that has one partition
> - using custom timestamp extractor, but that timestamp is just a wall 
> clock time
> 
> 
> Image the following events:
> 1., for 10 seconds I send in 5 messages / second
> 2., does not send any messages for 3 seconds
> 3., starts the 5 messages / second again
> 
> I see that punctuate() is not called during the 3 seconds when I do not 
> send any messages. This is ok according to the documentation, because 
> there is not any new messages to trigger the punctuate() call. When the 
> first few messages arrives after a restart the sending (point 3. above) I 
> see the following sequence of method calls:
> 
> 1., process() on the 1st message
> 2., punctuate() is called 3 times
> 3., process() on the 2nd message
> 4., process() on each following message
> 
> What I would expect instead is that punctuate() is called first and then 
> process() is called on the messages, because the first message's timestamp 
> is already 3 seconds older then the last punctuate() was called, so the 
> first message belongs after the 3 punctuate() calls.
> 
> Please let me know if this is a bug or intentional, in this case what is 
> the reason for processing one message before punctuate() is called?
> 
> 
> Thanks,
> Peter
> 
> Péter Sinóros-Szabó
> Software Engineer
> 
> Ustream, an IBM Company
> Andrassy ut 39, H-1061 Budapest
> Mobile: +36203693050
> Email: peter.sinoros-szabo@hu.ibm.com
>