You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Michal Borowiecki <mi...@openbet.com> on 2017/05/01 17:23:46 UTC

Re: [DISCUSS] KIP-138: Change punctuate semantics

Hi all,

As promised, here is my take at how one could implement the previously 
discussed hybrid semantics using the 2 PunctuationType callbacks (one 
for STREAM_TIME and one for SYSTEM_TIME).

However, there's a twist.

Since currently calling context.schedule() adds a new 
PunctuationSchedule and does not overwrite the previous one, a slight 
change would be required:

a) either that PuncuationSchedules are cancellable

b) or that calling schedule() ||overwrites(cancels) the previous one 
with the given |PunctuationType |(but that's not how it works currently)


Below is an example assuming approach a) is implemented by having 
schedule return Cancellable instead of void.

|ProcessorContext context;|
|long||streamTimeInterval = ...;|
|long||systemTimeUpperBound = ...;||//e.g. systemTimeUpperBound = 
streamTimeInterval + some tolerance|
|Cancellable streamTimeSchedule;|
|Cancellable systemTimeSchedule;|
|long||lastStreamTimePunctation = -||1||;|
||
|public||void||init(ProcessorContext context){|
|||this||.context = context;|
|||streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, 
streamTimeInterval, ||this||::streamTimePunctuate);|
|||systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, 
systemTimeUpperBound,||this||::systemTimePunctuate); |
|}|
||
|public||void||streamTimePunctuate(||long||streamTime){|
|||periodicBusiness(streamTime);|
|||systemTimeSchedule.cancel();|
|||systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, 
systemTimeUpperBound,||this||::systemTimePunctuate);|
|}|
||
|public||void||systemTimePunctuate(||long||systemTime){|
|||periodicBusiness(context.timestamp());|
|||streamTimeSchedule.cancel();|
|||streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, 
streamTimeInterval,||this||::streamTimePunctuate);|
|}|
||
|public||void||periodicBusiness(||long||streamTime){|
|||// guard against streamTime == -1, easy enough.|
|||// if you need system time instead, just use System.currentTimeMillis()|
||
|||// do something businessy here|
|}|

Where Cancellable is either an interface containing just a single void 
cancel() method or also boolean isCancelled() likehere 
<http://doc.akka.io/japi/akka/2.5.0/akka/actor/Cancellable.html>.


Please let your opinions known whether we should proceed in this 
direction or leave "hybrid" considerations out of scope.

Looking forward to hearing your thoughts.

Thanks,
Michal

On 30/04/17 20:07, Michal Borowiecki wrote:
>
> Hi Matthias,
>
> I'd like to start moving the discarded ideas into Rejected 
> Alternatives section. Before I do, I want to tidy them up, ensure 
> they've each been given proper treatment.
>
> To that end let me go back to one of your earlier comments about the 
> original suggestion (A) to put that to bed.
>
>
> On 04/04/17 06:44, Matthias J. Sax wrote:
>> (A) You argue, that users can still "punctuate" on event-time via
>> process(), but I am not sure if this is possible. Note, that users only
>> get record timestamps via context.timestamp(). Thus, users would need to
>> track the time progress per partition (based on the partitions they
>> obverse via context.partition(). (This alone puts a huge burden on the
>> user by itself.) However, users are not notified at startup what
>> partitions are assigned, and user are not notified when partitions get
>> revoked. Because this information is not available, it's not possible to
>> "manually advance" stream-time, and thus event-time punctuation within
>> process() seems not to be possible -- or do you see a way to get it
>> done? And even if, it might still be too clumsy to use.
> I might have missed something but I'm guessing your worry about users 
> having to track time progress /per partition/ comes from the what the 
> stream-time does currently.
> But I'm not sure that those semantics of stream-time are ideal for 
> users of punctuate.
> That is, if stream-time punctuate didn't exist and users had to use 
> process(), would they actually want to use the current semantics of 
> stream time?
>
> As a reminder stream time, in all its glory, is (not exactly actually, 
> but when trying to be absolutely precise here I spotted KAFKA-5144 
> <https://issues.apache.org/jira/browse/KAFKA-5144> so I think this 
> approximation suffices to illustrate the point):
>
> a minimum across all input partitions of (
>    if(msgs never received by partition) -1;
>    else {
>       a non-descending-minimum of ( the per-batch minimum msg timestamp)
>    }
> )
>
> Would that really be clear enough to the users of punctuate? Do they 
> care for such a convoluted notion of time? I see how this can be 
> useful for StreamTask to pick the next partition to take a record from 
> but for punctuate?
> If users had to implement punctuation with process(), is that what 
> they would have chosen as their notion of time?
> I'd argue not.
>
> None of the processors implementing the rich windowing/join operations 
> in the DSL use punctuate.
> Let's take the KStreamKStreamJoinProcessor as an example, in it's 
> process() method it simply uses context().timestamp(), which, since 
> it's called from process, returns simply, per javadoc:
> If it is triggered while processing a record streamed from the source 
> processor, timestamp is defined as the timestamp of the current input 
> record;
> So they don't use that convoluted formula for stream-time. Instead, 
> they only care about the timestamp of the current record. I think that 
> having users track just that wouldn't be that much of a burden. I 
> don't think they need to care about which partitions got assigned or 
> not. And StreamTask would still be picking records first from the 
> partition having the lowest timestamp to try to "synchronize" the 
> streams as it does now.
>
> What users would have to do in their Processor implementations is 
> somewhere along the lines of:
>
> long lastPunctuationTime = 0;
> long interval = <some-number>; //millis
>
> @Override
> public void process(K key, V value){
>     while (ctx.timestamp() >= lastPunctuationTime + interval){
>         punctuate(ctx.timestamp());
>         lastPunctuationTime += interval;// I'm not sure of the merit 
> of this vs lastPunctuationTime = ctx.timestamp(); but that's what 
> PunctuationQueue does currently
>     }
>     // do some other business logic here
> }
>
> Looking forward to your thoughts.
>
> Cheers,
> Michal
>
> -- 
> Signature
> <http://www.openbet.com/> 	Michal Borowiecki
> Senior Software Engineer L4
> 	T: 	+44 208 742 1600
>
> 	
> 	+44 203 249 8448
>
> 	
> 	
> 	E: 	michal.borowiecki@openbet.com
> 	W: 	www.openbet.com <http://www.openbet.com/>
>
> 	
> 	OpenBet Ltd
>
> 	Chiswick Park Building 9
>
> 	566 Chiswick High Rd
>
> 	London
>
> 	W4 5XT
>
> 	UK
>
> 	
> <https://www.openbet.com/email_promo>
>
> This message is confidential and intended only for the addressee. If 
> you have received this message in error, please immediately notify the 
> postmaster@openbet.com <ma...@openbet.com> and delete it 
> from your system as well as any copies. The content of e-mails as well 
> as traffic data may be monitored by OpenBet for employment and 
> security purposes. To protect the environment please do not print this 
> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park 
> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A 
> company registered in England and Wales. Registered no. 3134634. VAT 
> no. GB927523612
>

-- 
Signature
<http://www.openbet.com/> 	Michal Borowiecki
Senior Software Engineer L4
	T: 	+44 208 742 1600

	
	+44 203 249 8448

	
	
	E: 	michal.borowiecki@openbet.com
	W: 	www.openbet.com <http://www.openbet.com/>

	
	OpenBet Ltd

	Chiswick Park Building 9

	566 Chiswick High Rd

	London

	W4 5XT

	UK

	
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmaster@openbet.com <ma...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612


Re: [DISCUSS] KIP-138: Change punctuate semantics

Posted by Damian Guy <da...@gmail.com>.
Thanks Michael  - LGTM

On Fri, 5 May 2017 at 12:04 Michal Borowiecki <mi...@openbet.com>
wrote:

> I shall move all alternatives other than the main proposal into the
> Rejected Alternatives section and if I hear any objections, I'll move those
> back up and we'll discuss further.
>
> Done.
>
>
> Still looking forward to any comments, especially about the recently
> proposed ability to cancel punctuation schedules. I think it goes well in
> the spirit of making complex things possible (such as the hybrid semantics).
>
>
> In the absence of further comments I shall call for a vote in the next few
> days.
>
>
> Thanks,
>
> Michał
>
> On 04/05/17 09:41, Michal Borowiecki wrote:
>
> Further in this direction I've updated the main proposal to incorporate
> the Cancellable return type for ProcessorContext.schedule and the guidance
> on how to implement "hybrid" punctuation with the proposed 2
> PunctuationTypes.
>
> I look forward to more comments whether the Cancallable return type is an
> agreeable solution and it's precise definition.
>
> I shall move all alternatives other than the main proposal into the
> Rejected Alternatives section and if I hear any objections, I'll move those
> back up and we'll discuss further.
>
>
> Looking forward to all comments and suggestions.
>
>
> Thanks,
>
> Michal
>
> On 01/05/17 18:23, Michal Borowiecki wrote:
>
> Hi all,
>
> As promised, here is my take at how one could implement the previously
> discussed hybrid semantics using the 2 PunctuationType callbacks (one for
> STREAM_TIME and one for SYSTEM_TIME).
>
> However, there's a twist.
>
> Since currently calling context.schedule() adds a new PunctuationSchedule
> and does not overwrite the previous one, a slight change would be required:
>
> a) either that PuncuationSchedules are cancellable
>
> b) or that calling schedule() overwrites(cancels) the previous one with
> the given PunctuationType (but that's not how it works currently)
>
>
> Below is an example assuming approach a) is implemented by having schedule
> return Cancellable instead of void.
> ProcessorContext context;
> long streamTimeInterval = ...;
> long systemTimeUpperBound = ...; //e.g. systemTimeUpperBound =
> streamTimeInterval + some tolerance
> Cancellable streamTimeSchedule;
> Cancellable systemTimeSchedule;
> long lastStreamTimePunctation = -1;
>
> public void init(ProcessorContext context){
>     this.context = context;
>     streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME,
> streamTimeInterval,   this::streamTimePunctuate);
>     systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME,
> systemTimeUpperBound, this::systemTimePunctuate);
> }
>
> public void streamTimePunctuate(long streamTime){
>     periodicBusiness(streamTime);
>
>     systemTimeSchedule.cancel();
>     systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME,
> systemTimeUpperBound, this::systemTimePunctuate);
> }
>
> public void systemTimePunctuate(long systemTime){
>     periodicBusiness(context.timestamp());
>
>     streamTimeSchedule.cancel();
>     streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME,
> streamTimeInterval, this::streamTimePunctuate);
> }
>
> public void periodicBusiness(long streamTime){
>     // guard against streamTime == -1, easy enough.
>     // if you need system time instead, just use
> System.currentTimeMillis()
>
>     // do something businessy here
> }
>
>

Re: [DISCUSS] KIP-138: Change punctuate semantics

Posted by Michal Borowiecki <mi...@openbet.com>.
> I shall move all alternatives other than the main proposal into the 
> Rejected Alternatives section and if I hear any objections, I'll move 
> those back up and we'll discuss further.
Done.


Still looking forward to any comments, especially about the recently 
proposed ability to cancel punctuation schedules. I think it goes well 
in the spirit of making complex things possible (such as the hybrid 
semantics).


In the absence of further comments I shall call for a vote in the next 
few days.


Thanks,

Michał


On 04/05/17 09:41, Michal Borowiecki wrote:
>
> Further in this direction I've updated the main proposal to 
> incorporate the Cancellable return type for ProcessorContext.schedule 
> and the guidance on how to implement "hybrid" punctuation with the 
> proposed 2 PunctuationTypes.
>
> I look forward to more comments whether the Cancallable return type is 
> an agreeable solution and it's precise definition.
>
> I shall move all alternatives other than the main proposal into the 
> Rejected Alternatives section and if I hear any objections, I'll move 
> those back up and we'll discuss further.
>
>
> Looking forward to all comments and suggestions.
>
>
> Thanks,
>
> Michal
>
>
> On 01/05/17 18:23, Michal Borowiecki wrote:
>>
>> Hi all,
>>
>> As promised, here is my take at how one could implement the 
>> previously discussed hybrid semantics using the 2 PunctuationType 
>> callbacks (one for STREAM_TIME and one for SYSTEM_TIME).
>>
>> However, there's a twist.
>>
>> Since currently calling context.schedule() adds a new 
>> PunctuationSchedule and does not overwrite the previous one, a slight 
>> change would be required:
>>
>> a) either that PuncuationSchedules are cancellable
>>
>> b) or that calling schedule() ||overwrites(cancels) the previous one 
>> with the given |PunctuationType |(but that's not how it works currently)
>>
>>
>> Below is an example assuming approach a) is implemented by having 
>> schedule return Cancellable instead of void.
>>
>> |ProcessorContext context;|
>> |long||streamTimeInterval = ...;|
>> |long||systemTimeUpperBound = ...;||//e.g. systemTimeUpperBound = 
>> streamTimeInterval + some tolerance|
>> |Cancellable streamTimeSchedule;|
>> |Cancellable systemTimeSchedule;|
>> |long||lastStreamTimePunctation = -||1||;|
>> ||
>> |public||void||init(ProcessorContext context){|
>> |||this||.context = context;|
>> |||streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, 
>> streamTimeInterval, ||this||::streamTimePunctuate);|
>> |||systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, 
>> systemTimeUpperBound,||this||::systemTimePunctuate); |
>> |}|
>> ||
>> |public||void||streamTimePunctuate(||long||streamTime){|
>> |||periodicBusiness(streamTime);|
>> |||systemTimeSchedule.cancel();|
>> |||systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, 
>> systemTimeUpperBound,||this||::systemTimePunctuate);|
>> |}|
>> ||
>> |public||void||systemTimePunctuate(||long||systemTime){|
>> |||periodicBusiness(context.timestamp());|
>> |||streamTimeSchedule.cancel();|
>> |||streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, 
>> streamTimeInterval,||this||::streamTimePunctuate);|
>> |}|
>> ||
>> |public||void||periodicBusiness(||long||streamTime){|
>> |||// guard against streamTime == -1, easy enough.|
>> |||// if you need system time instead, just use 
>> System.currentTimeMillis()|
>> ||
>> |||// do something businessy here|
>> |}|
>>
>> Where Cancellable is either an interface containing just a single 
>> void cancel() method or also boolean isCancelled() likehere 
>> <http://doc.akka.io/japi/akka/2.5.0/akka/actor/Cancellable.html>.
>>
>>
>> Please let your opinions known whether we should proceed in this 
>> direction or leave "hybrid" considerations out of scope.
>>
>> Looking forward to hearing your thoughts.
>>
>> Thanks,
>> Michal
>>
>> On 30/04/17 20:07, Michal Borowiecki wrote:
>>>
>>> Hi Matthias,
>>>
>>> I'd like to start moving the discarded ideas into Rejected 
>>> Alternatives section. Before I do, I want to tidy them up, ensure 
>>> they've each been given proper treatment.
>>>
>>> To that end let me go back to one of your earlier comments about the 
>>> original suggestion (A) to put that to bed.
>>>
>>>
>>> On 04/04/17 06:44, Matthias J. Sax wrote:
>>>> (A) You argue, that users can still "punctuate" on event-time via
>>>> process(), but I am not sure if this is possible. Note, that users only
>>>> get record timestamps via context.timestamp(). Thus, users would need to
>>>> track the time progress per partition (based on the partitions they
>>>> obverse via context.partition(). (This alone puts a huge burden on the
>>>> user by itself.) However, users are not notified at startup what
>>>> partitions are assigned, and user are not notified when partitions get
>>>> revoked. Because this information is not available, it's not possible to
>>>> "manually advance" stream-time, and thus event-time punctuation within
>>>> process() seems not to be possible -- or do you see a way to get it
>>>> done? And even if, it might still be too clumsy to use.
>>> I might have missed something but I'm guessing your worry about 
>>> users having to track time progress /per partition/ comes from the 
>>> what the stream-time does currently.
>>> But I'm not sure that those semantics of stream-time are ideal for 
>>> users of punctuate.
>>> That is, if stream-time punctuate didn't exist and users had to use 
>>> process(), would they actually want to use the current semantics of 
>>> stream time?
>>>
>>> As a reminder stream time, in all its glory, is (not exactly 
>>> actually, but when trying to be absolutely precise here I spotted 
>>> KAFKA-5144 <https://issues.apache.org/jira/browse/KAFKA-5144> so I 
>>> think this approximation suffices to illustrate the point):
>>>
>>> a minimum across all input partitions of (
>>>    if(msgs never received by partition) -1;
>>>    else {
>>>       a non-descending-minimum of ( the per-batch minimum msg 
>>> timestamp)
>>>    }
>>> )
>>>
>>> Would that really be clear enough to the users of punctuate? Do they 
>>> care for such a convoluted notion of time? I see how this can be 
>>> useful for StreamTask to pick the next partition to take a record 
>>> from but for punctuate?
>>> If users had to implement punctuation with process(), is that what 
>>> they would have chosen as their notion of time?
>>> I'd argue not.
>>>
>>> None of the processors implementing the rich windowing/join 
>>> operations in the DSL use punctuate.
>>> Let's take the KStreamKStreamJoinProcessor as an example, in it's 
>>> process() method it simply uses context().timestamp(), which, since 
>>> it's called from process, returns simply, per javadoc:
>>> If it is triggered while processing a record streamed from the 
>>> source processor, timestamp is defined as the timestamp of the 
>>> current input record;
>>> So they don't use that convoluted formula for stream-time. Instead, 
>>> they only care about the timestamp of the current record. I think 
>>> that having users track just that wouldn't be that much of a burden. 
>>> I don't think they need to care about which partitions got assigned 
>>> or not. And StreamTask would still be picking records first from the 
>>> partition having the lowest timestamp to try to "synchronize" the 
>>> streams as it does now.
>>>
>>> What users would have to do in their Processor implementations is 
>>> somewhere along the lines of:
>>>
>>> long lastPunctuationTime = 0;
>>> long interval = <some-number>; //millis
>>>
>>> @Override
>>> public void process(K key, V value){
>>>     while (ctx.timestamp() >= lastPunctuationTime + interval){
>>>         punctuate(ctx.timestamp());
>>>         lastPunctuationTime += interval;// I'm not sure of the merit 
>>> of this vs lastPunctuationTime = ctx.timestamp(); but that's what 
>>> PunctuationQueue does currently
>>>     }
>>>     // do some other business logic here
>>> }
>>>
>>> Looking forward to your thoughts.
>>>
>>> Cheers,
>>> Michal
>>>
>>> -- 
>>> Signature
>>> <http://www.openbet.com/> 	Michal Borowiecki
>>> Senior Software Engineer L4
>>> 	T: 	+44 208 742 1600
>>>
>>> 	
>>> 	+44 203 249 8448
>>>
>>> 	
>>> 	
>>> 	E: 	michal.borowiecki@openbet.com
>>> 	W: 	www.openbet.com <http://www.openbet.com/>
>>>
>>> 	
>>> 	OpenBet Ltd
>>>
>>> 	Chiswick Park Building 9
>>>
>>> 	566 Chiswick High Rd
>>>
>>> 	London
>>>
>>> 	W4 5XT
>>>
>>> 	UK
>>>
>>> 	
>>> <https://www.openbet.com/email_promo>
>>>
>>> This message is confidential and intended only for the addressee. If 
>>> you have received this message in error, please immediately notify 
>>> the postmaster@openbet.com <ma...@openbet.com> and 
>>> delete it from your system as well as any copies. The content of 
>>> e-mails as well as traffic data may be monitored by OpenBet for 
>>> employment and security purposes. To protect the environment please 
>>> do not print this e-mail unless necessary. OpenBet Ltd. Registered 
>>> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 
>>> 5XT, United Kingdom. A company registered in England and Wales. 
>>> Registered no. 3134634. VAT no. GB927523612
>>>
>>
>> -- 
>> Signature
>> <http://www.openbet.com/> 	Michal Borowiecki
>> Senior Software Engineer L4
>> 	T: 	+44 208 742 1600
>>
>> 	
>> 	+44 203 249 8448
>>
>> 	
>> 	
>> 	E: 	michal.borowiecki@openbet.com
>> 	W: 	www.openbet.com <http://www.openbet.com/>
>>
>> 	
>> 	OpenBet Ltd
>>
>> 	Chiswick Park Building 9
>>
>> 	566 Chiswick High Rd
>>
>> 	London
>>
>> 	W4 5XT
>>
>> 	UK
>>
>> 	
>> <https://www.openbet.com/email_promo>
>>
>> This message is confidential and intended only for the addressee. If 
>> you have received this message in error, please immediately notify 
>> the postmaster@openbet.com <ma...@openbet.com> and delete 
>> it from your system as well as any copies. The content of e-mails as 
>> well as traffic data may be monitored by OpenBet for employment and 
>> security purposes. To protect the environment please do not print 
>> this e-mail unless necessary. OpenBet Ltd. Registered Office: 
>> Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, 
>> United Kingdom. A company registered in England and Wales. Registered 
>> no. 3134634. VAT no. GB927523612
>>
>
> -- 
> Signature
> <http://www.openbet.com/> 	Michal Borowiecki
> Senior Software Engineer L4
> 	T: 	+44 208 742 1600
>
> 	
> 	+44 203 249 8448
>
> 	
> 	
> 	E: 	michal.borowiecki@openbet.com
> 	W: 	www.openbet.com <http://www.openbet.com/>
>
> 	
> 	OpenBet Ltd
>
> 	Chiswick Park Building 9
>
> 	566 Chiswick High Rd
>
> 	London
>
> 	W4 5XT
>
> 	UK
>
> 	
> <https://www.openbet.com/email_promo>
>
> This message is confidential and intended only for the addressee. If 
> you have received this message in error, please immediately notify the 
> postmaster@openbet.com <ma...@openbet.com> and delete it 
> from your system as well as any copies. The content of e-mails as well 
> as traffic data may be monitored by OpenBet for employment and 
> security purposes. To protect the environment please do not print this 
> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park 
> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A 
> company registered in England and Wales. Registered no. 3134634. VAT 
> no. GB927523612
>

-- 
Signature
<http://www.openbet.com/> 	Michal Borowiecki
Senior Software Engineer L4
	T: 	+44 208 742 1600

	
	+44 203 249 8448

	
	
	E: 	michal.borowiecki@openbet.com
	W: 	www.openbet.com <http://www.openbet.com/>

	
	OpenBet Ltd

	Chiswick Park Building 9

	566 Chiswick High Rd

	London

	W4 5XT

	UK

	
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmaster@openbet.com <ma...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612


Re: [DISCUSS] KIP-138: Change punctuate semantics

Posted by Michal Borowiecki <mi...@openbet.com>.
Hi Matthias,

Apologies, somehow I totally missed this email earlier.

Wrt ValueTransformer, I added it to the the list of deprecated methods 
(PR is up to date).

Wrt Cancellable vs Cancelable:

I'm not fluent enough to have spotted this nuance, but having googled 
for it, you are right.

On the other hand however, the precedent seems to have been set by 
java.util.concurrent.Cancellable and akka for instance followed that 
with akka.actor.Cancellable.

Given established heritage in computing context, I'd err on the side of 
consistency with prior practice.

Unless anyone has strong opinions on this matter?


Thanks,

Michal


On 04/05/17 20:43, Matthias J. Sax wrote:
> Hi,
>
> thanks for updating the KIP. Looks good to me overall.
>
> I think adding `Cancellable` (or should it be `Cancelable` to follow
> American English?) is a clean solution, in contrast to the proposed
> alternative.
>
> One minor comment: can you add `ValueTransformer#punctuate()` to the
> list of deprecated methods?
>
>
> -Matthias
>
>
>
> On 5/4/17 1:41 AM, Michal Borowiecki wrote:
>> Further in this direction I've updated the main proposal to incorporate
>> the Cancellable return type for ProcessorContext.schedule and the
>> guidance on how to implement "hybrid" punctuation with the proposed 2
>> PunctuationTypes.
>>
>> I look forward to more comments whether the Cancallable return type is
>> an agreeable solution and it's precise definition.
>>
>> I shall move all alternatives other than the main proposal into the
>> Rejected Alternatives section and if I hear any objections, I'll move
>> those back up and we'll discuss further.
>>
>>
>> Looking forward to all comments and suggestions.
>>
>>
>> Thanks,
>>
>> Michal
>>
>>
>> On 01/05/17 18:23, Michal Borowiecki wrote:
>>> Hi all,
>>>
>>> As promised, here is my take at how one could implement the previously
>>> discussed hybrid semantics using the 2 PunctuationType callbacks (one
>>> for STREAM_TIME and one for SYSTEM_TIME).
>>>
>>> However, there's a twist.
>>>
>>> Since currently calling context.schedule() adds a new
>>> PunctuationSchedule and does not overwrite the previous one, a slight
>>> change would be required:
>>>
>>> a) either that PuncuationSchedules are cancellable
>>>
>>> b) or that calling schedule() ||overwrites(cancels) the previous one
>>> with the given |PunctuationType |(but that's not how it works currently)
>>>
>>>
>>> Below is an example assuming approach a) is implemented by having
>>> schedule return Cancellable instead of void.
>>>
>>> |ProcessorContext context;|
>>> |long| |streamTimeInterval = ...;|
>>> |long| |systemTimeUpperBound = ...; ||//e.g. systemTimeUpperBound =
>>> streamTimeInterval + some tolerance|
>>> |Cancellable streamTimeSchedule;|
>>> |Cancellable systemTimeSchedule;|
>>> |long| |lastStreamTimePunctation = -||1||;|
>>> | |
>>> |public| |void| |init(ProcessorContext context){|
>>> |    ||this||.context = context;|
>>> |    ||streamTimeSchedule =
>>> context.schedule(PunctuationType.STREAM_TIME,
>>> streamTimeInterval,   ||this||::streamTimePunctuate);|
>>> |    ||systemTimeSchedule =
>>> context.schedule(PunctuationType.SYSTEM_TIME,
>>> systemTimeUpperBound, ||this||::systemTimePunctuate);   |
>>> |}|
>>> | |
>>> |public| |void| |streamTimePunctuate(||long| |streamTime){|
>>> |    ||periodicBusiness(streamTime);|
>>>   
>>> |    ||systemTimeSchedule.cancel();|
>>> |    ||systemTimeSchedule =
>>> context.schedule(PunctuationType.SYSTEM_TIME,
>>> systemTimeUpperBound, ||this||::systemTimePunctuate);|
>>> |}|
>>> | |
>>> |public| |void| |systemTimePunctuate(||long| |systemTime){|
>>> |    ||periodicBusiness(context.timestamp());|
>>>   
>>> |    ||streamTimeSchedule.cancel();|
>>> |    ||streamTimeSchedule =
>>> context.schedule(PunctuationType.STREAM_TIME,
>>> streamTimeInterval, ||this||::streamTimePunctuate);|
>>> |}|
>>> | |
>>> |public| |void| |periodicBusiness(||long| |streamTime){|
>>> |    ||// guard against streamTime == -1, easy enough.|
>>> |    ||// if you need system time instead, just use
>>> System.currentTimeMillis()|
>>> | |
>>> |    ||// do something businessy here|
>>> |}|
>>>
>>> Where Cancellable is either an interface containing just a single void
>>> cancel() method or also boolean isCancelled() like here
>>> <http://doc.akka.io/japi/akka/2.5.0/akka/actor/Cancellable.html>.
>>>
>>>
>>> Please let your opinions known whether we should proceed in this
>>> direction or leave "hybrid" considerations out of scope.
>>>
>>> Looking forward to hearing your thoughts.
>>>
>>> Thanks,
>>> Michal
>>>
>>> On 30/04/17 20:07, Michal Borowiecki wrote:
>>>> Hi Matthias,
>>>>
>>>> I'd like to start moving the discarded ideas into Rejected
>>>> Alternatives section. Before I do, I want to tidy them up, ensure
>>>> they've each been given proper treatment.
>>>>
>>>> To that end let me go back to one of your earlier comments about the
>>>> original suggestion (A) to put that to bed.
>>>>
>>>>
>>>> On 04/04/17 06:44, Matthias J. Sax wrote:
>>>>> (A) You argue, that users can still "punctuate" on event-time via
>>>>> process(), but I am not sure if this is possible. Note, that users only
>>>>> get record timestamps via context.timestamp(). Thus, users would need to
>>>>> track the time progress per partition (based on the partitions they
>>>>> obverse via context.partition(). (This alone puts a huge burden on the
>>>>> user by itself.) However, users are not notified at startup what
>>>>> partitions are assigned, and user are not notified when partitions get
>>>>> revoked. Because this information is not available, it's not possible to
>>>>> "manually advance" stream-time, and thus event-time punctuation within
>>>>> process() seems not to be possible -- or do you see a way to get it
>>>>> done? And even if, it might still be too clumsy to use.
>>>> I might have missed something but I'm guessing your worry about users
>>>> having to track time progress /per partition/ comes from the what the
>>>> stream-time does currently.
>>>> But I'm not sure that those semantics of stream-time are ideal for
>>>> users of punctuate.
>>>> That is, if stream-time punctuate didn't exist and users had to use
>>>> process(), would they actually want to use the current semantics of
>>>> stream time?
>>>>
>>>> As a reminder stream time, in all its glory, is (not exactly
>>>> actually, but when trying to be absolutely precise here I spotted
>>>> KAFKA-5144 <https://issues.apache.org/jira/browse/KAFKA-5144> so I
>>>> think this approximation suffices to illustrate the point):
>>>>
>>>> a minimum across all input partitions of (
>>>>     if(msgs never received by partition) -1;
>>>>     else {
>>>>        a non-descending-minimum of ( the per-batch minimum msg timestamp)
>>>>     }
>>>> )
>>>>
>>>> Would that really be clear enough to the users of punctuate? Do they
>>>> care for such a convoluted notion of time? I see how this can be
>>>> useful for StreamTask to pick the next partition to take a record
>>>> from but for punctuate?
>>>> If users had to implement punctuation with process(), is that what
>>>> they would have chosen as their notion of time?
>>>> I'd argue not.
>>>>
>>>> None of the processors implementing the rich windowing/join
>>>> operations in the DSL use punctuate.
>>>> Let's take the KStreamKStreamJoinProcessor as an example, in it's
>>>> process() method it simply uses context().timestamp(), which, since
>>>> it's called from process, returns simply, per javadoc:
>>>> If it is triggered while processing a record streamed from the source
>>>> processor, timestamp is defined as the timestamp of the current input
>>>> record;
>>>> So they don't use that convoluted formula for stream-time. Instead,
>>>> they only care about the timestamp of the current record. I think
>>>> that having users track just that wouldn't be that much of a burden.
>>>> I don't think they need to care about which partitions got assigned
>>>> or not. And StreamTask would still be picking records first from the
>>>> partition having the lowest timestamp to try to "synchronize" the
>>>> streams as it does now.
>>>>
>>>> What users would have to do in their Processor implementations is
>>>> somewhere along the lines of:
>>>>
>>>> long lastPunctuationTime = 0;
>>>> long interval = <some-number>; //millis
>>>>
>>>> @Override
>>>> public void process(K key, V value){
>>>>      while (ctx.timestamp() >= lastPunctuationTime + interval){
>>>>          punctuate(ctx.timestamp());
>>>>          lastPunctuationTime += interval;// I'm not sure of the merit
>>>> of this vs lastPunctuationTime = ctx.timestamp(); but that's what
>>>> PunctuationQueue does currently
>>>>      }
>>>>      // do some other business logic here
>>>> }
>>>>
>>>> Looking forward to your thoughts.
>>>>
>>>> Cheers,
>>>> Michal
>>>>
>>>> -- 
>>>> Signature
>>>> <http://www.openbet.com/> 	Michal Borowiecki
>>>> Senior Software Engineer L4
>>>> 	T: 	+44 208 742 1600
>>>>
>>>> 	
>>>> 	+44 203 249 8448
>>>>
>>>> 	
>>>> 	
>>>> 	E: 	michal.borowiecki@openbet.com
>>>> 	W: 	www.openbet.com <http://www.openbet.com/>
>>>>
>>>> 	
>>>> 	OpenBet Ltd
>>>>
>>>> 	Chiswick Park Building 9
>>>>
>>>> 	566 Chiswick High Rd
>>>>
>>>> 	London
>>>>
>>>> 	W4 5XT
>>>>
>>>> 	UK
>>>>
>>>> 	
>>>> <https://www.openbet.com/email_promo>
>>>>
>>>> This message is confidential and intended only for the addressee. If
>>>> you have received this message in error, please immediately notify
>>>> the postmaster@openbet.com <ma...@openbet.com> and delete
>>>> it from your system as well as any copies. The content of e-mails as
>>>> well as traffic data may be monitored by OpenBet for employment and
>>>> security purposes. To protect the environment please do not print
>>>> this e-mail unless necessary. OpenBet Ltd. Registered Office:
>>>> Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
>>>> United Kingdom. A company registered in England and Wales. Registered
>>>> no. 3134634. VAT no. GB927523612
>>>>
>>> -- 
>>> Signature
>>> <http://www.openbet.com/> 	Michal Borowiecki
>>> Senior Software Engineer L4
>>> 	T: 	+44 208 742 1600
>>>
>>> 	
>>> 	+44 203 249 8448
>>>
>>> 	
>>> 	
>>> 	E: 	michal.borowiecki@openbet.com
>>> 	W: 	www.openbet.com <http://www.openbet.com/>
>>>
>>> 	
>>> 	OpenBet Ltd
>>>
>>> 	Chiswick Park Building 9
>>>
>>> 	566 Chiswick High Rd
>>>
>>> 	London
>>>
>>> 	W4 5XT
>>>
>>> 	UK
>>>
>>> 	
>>> <https://www.openbet.com/email_promo>
>>>
>>> This message is confidential and intended only for the addressee. If
>>> you have received this message in error, please immediately notify the
>>> postmaster@openbet.com <ma...@openbet.com> and delete it
>>> from your system as well as any copies. The content of e-mails as well
>>> as traffic data may be monitored by OpenBet for employment and
>>> security purposes. To protect the environment please do not print this
>>> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park
>>> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A
>>> company registered in England and Wales. Registered no. 3134634. VAT
>>> no. GB927523612
>>>
>> -- 
>> Signature
>> <http://www.openbet.com/> 	Michal Borowiecki
>> Senior Software Engineer L4
>> 	T: 	+44 208 742 1600
>>
>> 	
>> 	+44 203 249 8448
>>
>> 	
>> 	
>> 	E: 	michal.borowiecki@openbet.com
>> 	W: 	www.openbet.com <http://www.openbet.com/>
>>
>> 	
>> 	OpenBet Ltd
>>
>> 	Chiswick Park Building 9
>>
>> 	566 Chiswick High Rd
>>
>> 	London
>>
>> 	W4 5XT
>>
>> 	UK
>>
>> 	
>> <https://www.openbet.com/email_promo>
>>
>> This message is confidential and intended only for the addressee. If you
>> have received this message in error, please immediately notify the
>> postmaster@openbet.com <ma...@openbet.com> and delete it
>> from your system as well as any copies. The content of e-mails as well
>> as traffic data may be monitored by OpenBet for employment and security
>> purposes. To protect the environment please do not print this e-mail
>> unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building
>> 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
>> registered in England and Wales. Registered no. 3134634. VAT no.
>> GB927523612
>>

-- 
Signature
<http://www.openbet.com/> 	Michal Borowiecki
Senior Software Engineer L4
	T: 	+44 208 742 1600

	
	+44 203 249 8448

	
	
	E: 	michal.borowiecki@openbet.com
	W: 	www.openbet.com <http://www.openbet.com/>

	
	OpenBet Ltd

	Chiswick Park Building 9

	566 Chiswick High Rd

	London

	W4 5XT

	UK

	
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmaster@openbet.com <ma...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612


Re: [DISCUSS] KIP-138: Change punctuate semantics

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi,

thanks for updating the KIP. Looks good to me overall.

I think adding `Cancellable` (or should it be `Cancelable` to follow
American English?) is a clean solution, in contrast to the proposed
alternative.

One minor comment: can you add `ValueTransformer#punctuate()` to the
list of deprecated methods?


-Matthias



On 5/4/17 1:41 AM, Michal Borowiecki wrote:
> Further in this direction I've updated the main proposal to incorporate
> the Cancellable return type for ProcessorContext.schedule and the
> guidance on how to implement "hybrid" punctuation with the proposed 2
> PunctuationTypes.
> 
> I look forward to more comments whether the Cancallable return type is
> an agreeable solution and it's precise definition.
> 
> I shall move all alternatives other than the main proposal into the
> Rejected Alternatives section and if I hear any objections, I'll move
> those back up and we'll discuss further.
> 
> 
> Looking forward to all comments and suggestions.
> 
> 
> Thanks,
> 
> Michal
> 
> 
> On 01/05/17 18:23, Michal Borowiecki wrote:
>>
>> Hi all,
>>
>> As promised, here is my take at how one could implement the previously
>> discussed hybrid semantics using the 2 PunctuationType callbacks (one
>> for STREAM_TIME and one for SYSTEM_TIME).
>>
>> However, there's a twist.
>>
>> Since currently calling context.schedule() adds a new
>> PunctuationSchedule and does not overwrite the previous one, a slight
>> change would be required:
>>
>> a) either that PuncuationSchedules are cancellable
>>
>> b) or that calling schedule() ||overwrites(cancels) the previous one
>> with the given |PunctuationType |(but that's not how it works currently)
>>
>>
>> Below is an example assuming approach a) is implemented by having
>> schedule return Cancellable instead of void.
>>
>> |ProcessorContext context;|
>> |long| |streamTimeInterval = ...;|
>> |long| |systemTimeUpperBound = ...; ||//e.g. systemTimeUpperBound =
>> streamTimeInterval + some tolerance|
>> |Cancellable streamTimeSchedule;|
>> |Cancellable systemTimeSchedule;|
>> |long| |lastStreamTimePunctation = -||1||;|
>> | |
>> |public| |void| |init(ProcessorContext context){|
>> |    ||this||.context = context;|
>> |    ||streamTimeSchedule =
>> context.schedule(PunctuationType.STREAM_TIME,
>> streamTimeInterval,   ||this||::streamTimePunctuate);|
>> |    ||systemTimeSchedule =
>> context.schedule(PunctuationType.SYSTEM_TIME,
>> systemTimeUpperBound, ||this||::systemTimePunctuate);   |
>> |}|
>> | |
>> |public| |void| |streamTimePunctuate(||long| |streamTime){|
>> |    ||periodicBusiness(streamTime);|
>>  
>> |    ||systemTimeSchedule.cancel();|
>> |    ||systemTimeSchedule =
>> context.schedule(PunctuationType.SYSTEM_TIME,
>> systemTimeUpperBound, ||this||::systemTimePunctuate);|
>> |}|
>> | |
>> |public| |void| |systemTimePunctuate(||long| |systemTime){|
>> |    ||periodicBusiness(context.timestamp());|
>>  
>> |    ||streamTimeSchedule.cancel();|
>> |    ||streamTimeSchedule =
>> context.schedule(PunctuationType.STREAM_TIME,
>> streamTimeInterval, ||this||::streamTimePunctuate);|
>> |}|
>> | |
>> |public| |void| |periodicBusiness(||long| |streamTime){|
>> |    ||// guard against streamTime == -1, easy enough.|
>> |    ||// if you need system time instead, just use
>> System.currentTimeMillis()|
>> | |
>> |    ||// do something businessy here|
>> |}|
>>
>> Where Cancellable is either an interface containing just a single void
>> cancel() method or also boolean isCancelled() like here
>> <http://doc.akka.io/japi/akka/2.5.0/akka/actor/Cancellable.html>.
>>
>>
>> Please let your opinions known whether we should proceed in this
>> direction or leave "hybrid" considerations out of scope.
>>
>> Looking forward to hearing your thoughts.
>>
>> Thanks,
>> Michal
>>
>> On 30/04/17 20:07, Michal Borowiecki wrote:
>>>
>>> Hi Matthias,
>>>
>>> I'd like to start moving the discarded ideas into Rejected
>>> Alternatives section. Before I do, I want to tidy them up, ensure
>>> they've each been given proper treatment.
>>>
>>> To that end let me go back to one of your earlier comments about the
>>> original suggestion (A) to put that to bed.
>>>
>>>
>>> On 04/04/17 06:44, Matthias J. Sax wrote:
>>>> (A) You argue, that users can still "punctuate" on event-time via
>>>> process(), but I am not sure if this is possible. Note, that users only
>>>> get record timestamps via context.timestamp(). Thus, users would need to
>>>> track the time progress per partition (based on the partitions they
>>>> obverse via context.partition(). (This alone puts a huge burden on the
>>>> user by itself.) However, users are not notified at startup what
>>>> partitions are assigned, and user are not notified when partitions get
>>>> revoked. Because this information is not available, it's not possible to
>>>> "manually advance" stream-time, and thus event-time punctuation within
>>>> process() seems not to be possible -- or do you see a way to get it
>>>> done? And even if, it might still be too clumsy to use.
>>> I might have missed something but I'm guessing your worry about users
>>> having to track time progress /per partition/ comes from the what the
>>> stream-time does currently.
>>> But I'm not sure that those semantics of stream-time are ideal for
>>> users of punctuate.
>>> That is, if stream-time punctuate didn't exist and users had to use
>>> process(), would they actually want to use the current semantics of
>>> stream time?
>>>
>>> As a reminder stream time, in all its glory, is (not exactly
>>> actually, but when trying to be absolutely precise here I spotted
>>> KAFKA-5144 <https://issues.apache.org/jira/browse/KAFKA-5144> so I
>>> think this approximation suffices to illustrate the point):
>>>
>>> a minimum across all input partitions of (
>>>    if(msgs never received by partition) -1;
>>>    else {
>>>       a non-descending-minimum of ( the per-batch minimum msg timestamp)
>>>    }
>>> )
>>>
>>> Would that really be clear enough to the users of punctuate? Do they
>>> care for such a convoluted notion of time? I see how this can be
>>> useful for StreamTask to pick the next partition to take a record
>>> from but for punctuate?
>>> If users had to implement punctuation with process(), is that what
>>> they would have chosen as their notion of time?
>>> I'd argue not.
>>>
>>> None of the processors implementing the rich windowing/join
>>> operations in the DSL use punctuate.
>>> Let's take the KStreamKStreamJoinProcessor as an example, in it's
>>> process() method it simply uses context().timestamp(), which, since
>>> it's called from process, returns simply, per javadoc:
>>> If it is triggered while processing a record streamed from the source
>>> processor, timestamp is defined as the timestamp of the current input
>>> record;
>>> So they don't use that convoluted formula for stream-time. Instead,
>>> they only care about the timestamp of the current record. I think
>>> that having users track just that wouldn't be that much of a burden.
>>> I don't think they need to care about which partitions got assigned
>>> or not. And StreamTask would still be picking records first from the
>>> partition having the lowest timestamp to try to "synchronize" the
>>> streams as it does now.
>>>
>>> What users would have to do in their Processor implementations is
>>> somewhere along the lines of:
>>>
>>> long lastPunctuationTime = 0;
>>> long interval = <some-number>; //millis
>>>
>>> @Override
>>> public void process(K key, V value){
>>>     while (ctx.timestamp() >= lastPunctuationTime + interval){
>>>         punctuate(ctx.timestamp());
>>>         lastPunctuationTime += interval;// I'm not sure of the merit
>>> of this vs lastPunctuationTime = ctx.timestamp(); but that's what
>>> PunctuationQueue does currently
>>>     }
>>>     // do some other business logic here
>>> }
>>>
>>> Looking forward to your thoughts.
>>>
>>> Cheers,
>>> Michal
>>>
>>> -- 
>>> Signature
>>> <http://www.openbet.com/> 	Michal Borowiecki
>>> Senior Software Engineer L4
>>> 	T: 	+44 208 742 1600
>>>
>>> 	
>>> 	+44 203 249 8448
>>>
>>> 	
>>> 	 
>>> 	E: 	michal.borowiecki@openbet.com
>>> 	W: 	www.openbet.com <http://www.openbet.com/>
>>>
>>> 	
>>> 	OpenBet Ltd
>>>
>>> 	Chiswick Park Building 9
>>>
>>> 	566 Chiswick High Rd
>>>
>>> 	London
>>>
>>> 	W4 5XT
>>>
>>> 	UK
>>>
>>> 	
>>> <https://www.openbet.com/email_promo>
>>>
>>> This message is confidential and intended only for the addressee. If
>>> you have received this message in error, please immediately notify
>>> the postmaster@openbet.com <ma...@openbet.com> and delete
>>> it from your system as well as any copies. The content of e-mails as
>>> well as traffic data may be monitored by OpenBet for employment and
>>> security purposes. To protect the environment please do not print
>>> this e-mail unless necessary. OpenBet Ltd. Registered Office:
>>> Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
>>> United Kingdom. A company registered in England and Wales. Registered
>>> no. 3134634. VAT no. GB927523612
>>>
>>
>> -- 
>> Signature
>> <http://www.openbet.com/> 	Michal Borowiecki
>> Senior Software Engineer L4
>> 	T: 	+44 208 742 1600
>>
>> 	
>> 	+44 203 249 8448
>>
>> 	
>> 	 
>> 	E: 	michal.borowiecki@openbet.com
>> 	W: 	www.openbet.com <http://www.openbet.com/>
>>
>> 	
>> 	OpenBet Ltd
>>
>> 	Chiswick Park Building 9
>>
>> 	566 Chiswick High Rd
>>
>> 	London
>>
>> 	W4 5XT
>>
>> 	UK
>>
>> 	
>> <https://www.openbet.com/email_promo>
>>
>> This message is confidential and intended only for the addressee. If
>> you have received this message in error, please immediately notify the
>> postmaster@openbet.com <ma...@openbet.com> and delete it
>> from your system as well as any copies. The content of e-mails as well
>> as traffic data may be monitored by OpenBet for employment and
>> security purposes. To protect the environment please do not print this
>> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park
>> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A
>> company registered in England and Wales. Registered no. 3134634. VAT
>> no. GB927523612
>>
> 
> -- 
> Signature
> <http://www.openbet.com/> 	Michal Borowiecki
> Senior Software Engineer L4
> 	T: 	+44 208 742 1600
> 
> 	
> 	+44 203 249 8448
> 
> 	
> 	 
> 	E: 	michal.borowiecki@openbet.com
> 	W: 	www.openbet.com <http://www.openbet.com/>
> 
> 	
> 	OpenBet Ltd
> 
> 	Chiswick Park Building 9
> 
> 	566 Chiswick High Rd
> 
> 	London
> 
> 	W4 5XT
> 
> 	UK
> 
> 	
> <https://www.openbet.com/email_promo>
> 
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmaster@openbet.com <ma...@openbet.com> and delete it
> from your system as well as any copies. The content of e-mails as well
> as traffic data may be monitored by OpenBet for employment and security
> purposes. To protect the environment please do not print this e-mail
> unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building
> 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
> registered in England and Wales. Registered no. 3134634. VAT no.
> GB927523612
> 


Re: [DISCUSS] KIP-138: Change punctuate semantics

Posted by Michal Borowiecki <mi...@openbet.com>.
Further in this direction I've updated the main proposal to incorporate 
the Cancellable return type for ProcessorContext.schedule and the 
guidance on how to implement "hybrid" punctuation with the proposed 2 
PunctuationTypes.

I look forward to more comments whether the Cancallable return type is 
an agreeable solution and it's precise definition.

I shall move all alternatives other than the main proposal into the 
Rejected Alternatives section and if I hear any objections, I'll move 
those back up and we'll discuss further.


Looking forward to all comments and suggestions.


Thanks,

Michal


On 01/05/17 18:23, Michal Borowiecki wrote:
>
> Hi all,
>
> As promised, here is my take at how one could implement the previously 
> discussed hybrid semantics using the 2 PunctuationType callbacks (one 
> for STREAM_TIME and one for SYSTEM_TIME).
>
> However, there's a twist.
>
> Since currently calling context.schedule() adds a new 
> PunctuationSchedule and does not overwrite the previous one, a slight 
> change would be required:
>
> a) either that PuncuationSchedules are cancellable
>
> b) or that calling schedule() ||overwrites(cancels) the previous one 
> with the given |PunctuationType |(but that's not how it works currently)
>
>
> Below is an example assuming approach a) is implemented by having 
> schedule return Cancellable instead of void.
>
> |ProcessorContext context;|
> |long||streamTimeInterval = ...;|
> |long||systemTimeUpperBound = ...;||//e.g. systemTimeUpperBound = 
> streamTimeInterval + some tolerance|
> |Cancellable streamTimeSchedule;|
> |Cancellable systemTimeSchedule;|
> |long||lastStreamTimePunctation = -||1||;|
> ||
> |public||void||init(ProcessorContext context){|
> |||this||.context = context;|
> |||streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, 
> streamTimeInterval, ||this||::streamTimePunctuate);|
> |||systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, 
> systemTimeUpperBound,||this||::systemTimePunctuate); |
> |}|
> ||
> |public||void||streamTimePunctuate(||long||streamTime){|
> |||periodicBusiness(streamTime);|
> |||systemTimeSchedule.cancel();|
> |||systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, 
> systemTimeUpperBound,||this||::systemTimePunctuate);|
> |}|
> ||
> |public||void||systemTimePunctuate(||long||systemTime){|
> |||periodicBusiness(context.timestamp());|
> |||streamTimeSchedule.cancel();|
> |||streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, 
> streamTimeInterval,||this||::streamTimePunctuate);|
> |}|
> ||
> |public||void||periodicBusiness(||long||streamTime){|
> |||// guard against streamTime == -1, easy enough.|
> |||// if you need system time instead, just use 
> System.currentTimeMillis()|
> ||
> |||// do something businessy here|
> |}|
>
> Where Cancellable is either an interface containing just a single void 
> cancel() method or also boolean isCancelled() likehere 
> <http://doc.akka.io/japi/akka/2.5.0/akka/actor/Cancellable.html>.
>
>
> Please let your opinions known whether we should proceed in this 
> direction or leave "hybrid" considerations out of scope.
>
> Looking forward to hearing your thoughts.
>
> Thanks,
> Michal
>
> On 30/04/17 20:07, Michal Borowiecki wrote:
>>
>> Hi Matthias,
>>
>> I'd like to start moving the discarded ideas into Rejected 
>> Alternatives section. Before I do, I want to tidy them up, ensure 
>> they've each been given proper treatment.
>>
>> To that end let me go back to one of your earlier comments about the 
>> original suggestion (A) to put that to bed.
>>
>>
>> On 04/04/17 06:44, Matthias J. Sax wrote:
>>> (A) You argue, that users can still "punctuate" on event-time via
>>> process(), but I am not sure if this is possible. Note, that users only
>>> get record timestamps via context.timestamp(). Thus, users would need to
>>> track the time progress per partition (based on the partitions they
>>> obverse via context.partition(). (This alone puts a huge burden on the
>>> user by itself.) However, users are not notified at startup what
>>> partitions are assigned, and user are not notified when partitions get
>>> revoked. Because this information is not available, it's not possible to
>>> "manually advance" stream-time, and thus event-time punctuation within
>>> process() seems not to be possible -- or do you see a way to get it
>>> done? And even if, it might still be too clumsy to use.
>> I might have missed something but I'm guessing your worry about users 
>> having to track time progress /per partition/ comes from the what the 
>> stream-time does currently.
>> But I'm not sure that those semantics of stream-time are ideal for 
>> users of punctuate.
>> That is, if stream-time punctuate didn't exist and users had to use 
>> process(), would they actually want to use the current semantics of 
>> stream time?
>>
>> As a reminder stream time, in all its glory, is (not exactly 
>> actually, but when trying to be absolutely precise here I spotted 
>> KAFKA-5144 <https://issues.apache.org/jira/browse/KAFKA-5144> so I 
>> think this approximation suffices to illustrate the point):
>>
>> a minimum across all input partitions of (
>>    if(msgs never received by partition) -1;
>>    else {
>>       a non-descending-minimum of ( the per-batch minimum msg timestamp)
>>    }
>> )
>>
>> Would that really be clear enough to the users of punctuate? Do they 
>> care for such a convoluted notion of time? I see how this can be 
>> useful for StreamTask to pick the next partition to take a record 
>> from but for punctuate?
>> If users had to implement punctuation with process(), is that what 
>> they would have chosen as their notion of time?
>> I'd argue not.
>>
>> None of the processors implementing the rich windowing/join 
>> operations in the DSL use punctuate.
>> Let's take the KStreamKStreamJoinProcessor as an example, in it's 
>> process() method it simply uses context().timestamp(), which, since 
>> it's called from process, returns simply, per javadoc:
>> If it is triggered while processing a record streamed from the source 
>> processor, timestamp is defined as the timestamp of the current input 
>> record;
>> So they don't use that convoluted formula for stream-time. Instead, 
>> they only care about the timestamp of the current record. I think 
>> that having users track just that wouldn't be that much of a burden. 
>> I don't think they need to care about which partitions got assigned 
>> or not. And StreamTask would still be picking records first from the 
>> partition having the lowest timestamp to try to "synchronize" the 
>> streams as it does now.
>>
>> What users would have to do in their Processor implementations is 
>> somewhere along the lines of:
>>
>> long lastPunctuationTime = 0;
>> long interval = <some-number>; //millis
>>
>> @Override
>> public void process(K key, V value){
>>     while (ctx.timestamp() >= lastPunctuationTime + interval){
>>         punctuate(ctx.timestamp());
>>         lastPunctuationTime += interval;// I'm not sure of the merit 
>> of this vs lastPunctuationTime = ctx.timestamp(); but that's what 
>> PunctuationQueue does currently
>>     }
>>     // do some other business logic here
>> }
>>
>> Looking forward to your thoughts.
>>
>> Cheers,
>> Michal
>>
>> -- 
>> Signature
>> <http://www.openbet.com/> 	Michal Borowiecki
>> Senior Software Engineer L4
>> 	T: 	+44 208 742 1600
>>
>> 	
>> 	+44 203 249 8448
>>
>> 	
>> 	
>> 	E: 	michal.borowiecki@openbet.com
>> 	W: 	www.openbet.com <http://www.openbet.com/>
>>
>> 	
>> 	OpenBet Ltd
>>
>> 	Chiswick Park Building 9
>>
>> 	566 Chiswick High Rd
>>
>> 	London
>>
>> 	W4 5XT
>>
>> 	UK
>>
>> 	
>> <https://www.openbet.com/email_promo>
>>
>> This message is confidential and intended only for the addressee. If 
>> you have received this message in error, please immediately notify 
>> the postmaster@openbet.com <ma...@openbet.com> and delete 
>> it from your system as well as any copies. The content of e-mails as 
>> well as traffic data may be monitored by OpenBet for employment and 
>> security purposes. To protect the environment please do not print 
>> this e-mail unless necessary. OpenBet Ltd. Registered Office: 
>> Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, 
>> United Kingdom. A company registered in England and Wales. Registered 
>> no. 3134634. VAT no. GB927523612
>>
>
> -- 
> Signature
> <http://www.openbet.com/> 	Michal Borowiecki
> Senior Software Engineer L4
> 	T: 	+44 208 742 1600
>
> 	
> 	+44 203 249 8448
>
> 	
> 	
> 	E: 	michal.borowiecki@openbet.com
> 	W: 	www.openbet.com <http://www.openbet.com/>
>
> 	
> 	OpenBet Ltd
>
> 	Chiswick Park Building 9
>
> 	566 Chiswick High Rd
>
> 	London
>
> 	W4 5XT
>
> 	UK
>
> 	
> <https://www.openbet.com/email_promo>
>
> This message is confidential and intended only for the addressee. If 
> you have received this message in error, please immediately notify the 
> postmaster@openbet.com <ma...@openbet.com> and delete it 
> from your system as well as any copies. The content of e-mails as well 
> as traffic data may be monitored by OpenBet for employment and 
> security purposes. To protect the environment please do not print this 
> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park 
> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A 
> company registered in England and Wales. Registered no. 3134634. VAT 
> no. GB927523612
>

-- 
Signature
<http://www.openbet.com/> 	Michal Borowiecki
Senior Software Engineer L4
	T: 	+44 208 742 1600

	
	+44 203 249 8448

	
	
	E: 	michal.borowiecki@openbet.com
	W: 	www.openbet.com <http://www.openbet.com/>

	
	OpenBet Ltd

	Chiswick Park Building 9

	566 Chiswick High Rd

	London

	W4 5XT

	UK

	
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmaster@openbet.com <ma...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612