You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax" <ma...@confluent.io> on 2019/02/21 02:19:27 UTC

Re: [DISCUSS] KIP-424: Allow suppression of intermediate events based on wall clock time

Jonathan,

thanks for the KIP. Corner case question:

What happens if an application is stopped an restarted?

 - Should suppress() flush all records (would be _before_ the time elapsed)?
 - Or should it preserve buffered records and reload on restart? For
this case, should the record be flushed on reload (elapsed time is
unknown) or should we reset the timer to zero?


What is unclear to me atm, is the use-case you anticipate. If you assume
a live run of an applications, event-time and processing-time should be
fairly identical (at least with regard to data rates). Thus, suppress()
on event-time should give you about the same behavior as wall-clock
time? If you disagree, can you elaborate?

This leave the case for data reprocessing, for which event-time advances
much faster than wall-clock time. Is this the target use-case?


About the implementation: checking wall-clock time is an expensive
system call, so I am little worried about run-time overhead. This seems
not to be an implementation detail and thus, it might be worth to
includes is in the discussion. The question is, how strict the guarantee
when records should be flushed should be. Assume you set a timer of 1
seconds, and you have a data rate of 1000 records per second, with each
record arriving one ms after the other all each with different key. To
flush this data "correctly" we would need to check wall-clock time very
millisecond... Thoughts?

(We don't need to dive into all details, but a high level discussion
about the desired algorithm and guarantees would be good to have IMHO.)





-Matthias


On 1/30/19 12:16 PM, John Roesler wrote:
> Hi Jonathan,
> 
> Thanks for the KIP!
> 
> I think all the reviewers are heads-down right now reviewing code for the
> imminent 2.2 release, so this discussion may not get much traffic over the
> next couple of weeks. You might want to just keep bumping it once a week or
> so until people start finding time to review and respond.
> 
> Also, This message got marked as spam for me (which happens for mailing
> list messages sometimes, for some reason). I'm hoping that this response
> will hoist it into peoples' inboxes...
> 
> Thanks again for your work on this issue, and I look forward to the
> discussion!
> -John
> 
> On Wed, Jan 30, 2019 at 12:24 AM jonathangordon@newrelic.com <
> jonathangordon@newrelic.com> wrote:
> 
>> Hi all,
>>
>> I just published KIP-424: Allow suppression of intermediate events based
>> on wall clock time
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-424%3A+Allow+suppression+of+intermediate+events+based+on+wall+clock+time
>>
>> I am eager to hear your feedback and concerns. Thanks John Roesler for
>> your guidance in shaping my first KIP!
>>
>> I look forward to working with the Kafka community to see this through,
>>
>> Jonathan
>>
>>
>>
> 


Re: [DISCUSS] KIP-424: Allow suppression of intermediate events based on wall clock time

Posted by John Roesler <jo...@confluent.io>.
Hey Jonathan,

What's the status of this KIP? I was just in a discussion about
suppress, and it sparked my memory of this idea.

Thanks,
-John

On Mon, Mar 11, 2019 at 4:39 PM John Roesler <jo...@confluent.io> wrote:
>
> Thanks for the response, Matthias, I agree on both of these points.
>
> I didn't mean to question whether we should discuss it; we should, since as you point out both points affect the behavior of the API.
>
> Regarding checking system time,
> After reviewing the motivation of the KIP, it seems like a lower bound on how long to suppress updates should be sufficient. This is reinforced by the fact that the proposed behavior is to emit only when processing new data. Since this is the proposed behavior, it should be fine to use the system time we already checked at the start of the processing loop. Practically speaking, a "best effort lower bound"-type guarantee might be a good starting point. It gives us the flexibility to implement it efficiently, and we can always tighten the bound later, if there are requests to do so.
>
> Regarding flushing on shutdown, can you elaborate of the motivation for doing so?
>
> Thanks,
> -John
>
> On Mon, Mar 11, 2019 at 3:13 PM Matthias J. Sax <ma...@confluent.io> wrote:
>>
>> I agree that there are multiple ways how to avoid calling
>> `System.currentTimeMillis()`. However, the KIP needs to define the
>> public contract to explain users what behavior they can expect (the
>> simplest thing might be to say, it's based on `punctuation()` schedule
>> -- not sure if this is desired or not).
>>
>> Similarly, the question about "why should we slush on shutdown" is part
>> of the contract and multiple ways how to design it seem possible.
>>
>>
>>
>> -Matthias
>>
>> On 3/11/19 8:30 AM, John Roesler wrote:
>> > Hey, all, just chiming in to keep the discussion moving...
>> >
>> > Regarding whether to flush or not on shutdown, I'm curious why we *would*
>> > flush...
>> > The record cache does this, but that's because it's not durable. The
>> > suppression buffer is already backed by a changelog specifically so that it
>> > can provide exactly the timing you configure, and not have to emit early
>> > just because the commit interval is short or the task is migrated. So,
>> > regardless of the commit interval or application lifecycle, if I tell
>> > suppression to wait 5 minutes before emitting, it'll wait 5 minutes. It
>> > seems asymmetric for wall-clock suppression to behave differently.
>> >
>> > Regarding checking wall-clock time, yes, it can be expensive, but there are
>> > a number of ways we can cope with it without introducing a complicated
>> > algorithm:
>> > * use nano time
>> > * check the wall-clock once per batch and set it on the processor context
>> > in org.apache.kafka.streams.processor.internals.StreamThread#runOnce (we
>> > already check system time here anyway)
>> > * maybe just do the naive thing and measure the overhead. I.e., maybe we
>> > should benchmark the implementation anyway to look for this or other
>> > bottlenecks, and fix performance problem in the order they appear.
>> >
>> > Thoughts?
>> >
>> > Thanks,
>> > -John
>> >
>> > On Mon, Feb 25, 2019 at 4:36 PM jonathangordon@newrelic.com <
>> > jonathangordon@newrelic.com> wrote:
>> >
>> >> On 2019/02/21 02:19:27, "Matthias J. Sax" <ma...@confluent.io> wrote:
>> >>> thanks for the KIP. Corner case question:
>> >>>
>> >>> What happens if an application is stopped an restarted?
>> >>>
>> >>>  - Should suppress() flush all records (would be _before_ the time
>> >> elapsed)?
>> >>>  - Or should it preserve buffered records and reload on restart? For
>> >>> this case, should the record be flushed on reload (elapsed time is
>> >>> unknown) or should we reset the timer to zero?
>> >>
>> >> My opinion is that we should aim for simplicity for the first
>> >> implementation of this feature: Flush all the records on shutdown. If
>> >> there's demand in the future for strict adherence on shutdown we can
>> >> implement them as extra params to Suppressed api.
>> >>
>> >>> What is unclear to me atm, is the use-case you anticipate. If you assume
>> >>> a live run of an applications, event-time and processing-time should be
>> >>> fairly identical (at least with regard to data rates). Thus, suppress()
>> >>> on event-time should give you about the same behavior as wall-clock
>> >>> time? If you disagree, can you elaborate?
>> >>
>> >> Imagine a session window where you aggregate 10K events that usually occur
>> >> within 2-3 seconds of each other (event time). However, they are ingested
>> >> in batches of 1000 or so, spread out over 2-3 minutes (ingest time), and
>> >> not necessarily in order. It's important for us to be able to publish this
>> >> aggregate in real-time as we get new data (every 10 seconds) to keep our
>> >> time to glass low, but our data store is non-updateable so we'd like to
>> >> limit the number of aggregates we publish.
>> >>
>> >> If you imagine a case where all the event batches arrive in reverse order
>> >> for one particular session window, then once the stream time advances past
>> >> the suppression threshold, we could publish an aggregate update for each
>> >> newly received event.
>> >>
>> >>> This leave the case for data reprocessing, for which event-time advances
>> >>> much faster than wall-clock time. Is this the target use-case?
>> >>
>> >> No, see above.
>> >>
>> >>> About the implementation: checking wall-clock time is an expensive
>> >>> system call, so I am little worried about run-time overhead. This seems
>> >>> not to be an implementation detail and thus, it might be worth to
>> >>> includes is in the discussion. The question is, how strict the guarantee
>> >>> when records should be flushed should be. Assume you set a timer of 1
>> >>> seconds, and you have a data rate of 1000 records per second, with each
>> >>> record arriving one ms after the other all each with different key. To
>> >>> flush this data "correctly" we would need to check wall-clock time very
>> >>> millisecond... Thoughts?
>> >>>
>> >>> (We don't need to dive into all details, but a high level discussion
>> >>> about the desired algorithm and guarantees would be good to have IMHO.)
>> >>
>> >> I had never dug into the performance characteristics of
>> >> currentTimeMillis() before:
>> >>
>> >> http://pzemtsov.github.io/2017/07/23/the-slow-currenttimemillis.html
>> >>
>> >> So if we assume the slow Linux average of 640 ns/call, at a 1000 calls/sec
>> >> that's 0.64 ms. Doesn't seem terrible to me but I imagine for some use
>> >> cases 1M calls/sec might be reasonable and now we're up to 0.64s just in
>> >> system time checking. Perhaps we add some logic that calculates the rate of
>> >> data input and if it exceeds some threshold we only check the time every n
>> >> records? The trick there I suppose is for very bursty traffic you could
>> >> exceed and then wait too long to trigger another check. Maybe we store a
>> >> moving average? Or perhaps this is getting too complicated?
>> >>
>> >>
>> >>
>> >
>>

Re: [DISCUSS] KIP-424: Allow suppression of intermediate events based on wall clock time

Posted by jo...@newrelic.com, jo...@newrelic.com.
I'd like to move this KIP along. To summarize where we've landed so far:

- Should suppress() flush all records (would be _before_ the time elapsed)?
- Or should it preserve buffered records and reload on restart? For
this case, should the record be flushed on reload (elapsed time is
unknown) or should we reset the timer to zero?

John Roesler suggested we preserve the buffered records since suppression is backed with a durable store via the changelog. Works for me.

- How strict the guarantee when records should be flushed should be.

John Roesler suggested to use the system time we already checked at the start of the processing loop. This should incur no extra performance hits with extra calls to System.currentTimeMillis(). Works for me.

I'm happy to add these details to the KIP if all agree. Also encourage other questions before putting this to a vote.

Thanks!

Re: [DISCUSS] KIP-424: Allow suppression of intermediate events based on wall clock time

Posted by jo...@newrelic.com, jo...@newrelic.com.
On 2019/03/11 21:39:18, John Roesler <jo...@confluent.io> wrote: 
> Regarding flushing on shutdown, can you elaborate of the motivation for
> doing so?

I'm not in any way attached to this idea if you think it's not any additional trouble. It wasn't obvious to me how you would store the "wall clock time last flushed" in the changelog. 

Re: [DISCUSS] KIP-424: Allow suppression of intermediate events based on wall clock time

Posted by John Roesler <jo...@confluent.io>.
Thanks for the response, Matthias, I agree on both of these points.

I didn't mean to question whether we should discuss it; we should, since as
you point out both points affect the behavior of the API.

Regarding checking system time,
After reviewing the motivation of the KIP, it seems like a lower bound on
how long to suppress updates should be sufficient. This is reinforced by
the fact that the proposed behavior is to emit only when processing new
data. Since this is the proposed behavior, it should be fine to use the
system time we already checked at the start of the processing loop.
Practically speaking, a "best effort lower bound"-type guarantee might be a
good starting point. It gives us the flexibility to implement it
efficiently, and we can always tighten the bound later, if there are
requests to do so.

Regarding flushing on shutdown, can you elaborate of the motivation for
doing so?

Thanks,
-John

On Mon, Mar 11, 2019 at 3:13 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> I agree that there are multiple ways how to avoid calling
> `System.currentTimeMillis()`. However, the KIP needs to define the
> public contract to explain users what behavior they can expect (the
> simplest thing might be to say, it's based on `punctuation()` schedule
> -- not sure if this is desired or not).
>
> Similarly, the question about "why should we slush on shutdown" is part
> of the contract and multiple ways how to design it seem possible.
>
>
>
> -Matthias
>
> On 3/11/19 8:30 AM, John Roesler wrote:
> > Hey, all, just chiming in to keep the discussion moving...
> >
> > Regarding whether to flush or not on shutdown, I'm curious why we *would*
> > flush...
> > The record cache does this, but that's because it's not durable. The
> > suppression buffer is already backed by a changelog specifically so that
> it
> > can provide exactly the timing you configure, and not have to emit early
> > just because the commit interval is short or the task is migrated. So,
> > regardless of the commit interval or application lifecycle, if I tell
> > suppression to wait 5 minutes before emitting, it'll wait 5 minutes. It
> > seems asymmetric for wall-clock suppression to behave differently.
> >
> > Regarding checking wall-clock time, yes, it can be expensive, but there
> are
> > a number of ways we can cope with it without introducing a complicated
> > algorithm:
> > * use nano time
> > * check the wall-clock once per batch and set it on the processor context
> > in org.apache.kafka.streams.processor.internals.StreamThread#runOnce (we
> > already check system time here anyway)
> > * maybe just do the naive thing and measure the overhead. I.e., maybe we
> > should benchmark the implementation anyway to look for this or other
> > bottlenecks, and fix performance problem in the order they appear.
> >
> > Thoughts?
> >
> > Thanks,
> > -John
> >
> > On Mon, Feb 25, 2019 at 4:36 PM jonathangordon@newrelic.com <
> > jonathangordon@newrelic.com> wrote:
> >
> >> On 2019/02/21 02:19:27, "Matthias J. Sax" <ma...@confluent.io>
> wrote:
> >>> thanks for the KIP. Corner case question:
> >>>
> >>> What happens if an application is stopped an restarted?
> >>>
> >>>  - Should suppress() flush all records (would be _before_ the time
> >> elapsed)?
> >>>  - Or should it preserve buffered records and reload on restart? For
> >>> this case, should the record be flushed on reload (elapsed time is
> >>> unknown) or should we reset the timer to zero?
> >>
> >> My opinion is that we should aim for simplicity for the first
> >> implementation of this feature: Flush all the records on shutdown. If
> >> there's demand in the future for strict adherence on shutdown we can
> >> implement them as extra params to Suppressed api.
> >>
> >>> What is unclear to me atm, is the use-case you anticipate. If you
> assume
> >>> a live run of an applications, event-time and processing-time should be
> >>> fairly identical (at least with regard to data rates). Thus, suppress()
> >>> on event-time should give you about the same behavior as wall-clock
> >>> time? If you disagree, can you elaborate?
> >>
> >> Imagine a session window where you aggregate 10K events that usually
> occur
> >> within 2-3 seconds of each other (event time). However, they are
> ingested
> >> in batches of 1000 or so, spread out over 2-3 minutes (ingest time), and
> >> not necessarily in order. It's important for us to be able to publish
> this
> >> aggregate in real-time as we get new data (every 10 seconds) to keep our
> >> time to glass low, but our data store is non-updateable so we'd like to
> >> limit the number of aggregates we publish.
> >>
> >> If you imagine a case where all the event batches arrive in reverse
> order
> >> for one particular session window, then once the stream time advances
> past
> >> the suppression threshold, we could publish an aggregate update for each
> >> newly received event.
> >>
> >>> This leave the case for data reprocessing, for which event-time
> advances
> >>> much faster than wall-clock time. Is this the target use-case?
> >>
> >> No, see above.
> >>
> >>> About the implementation: checking wall-clock time is an expensive
> >>> system call, so I am little worried about run-time overhead. This seems
> >>> not to be an implementation detail and thus, it might be worth to
> >>> includes is in the discussion. The question is, how strict the
> guarantee
> >>> when records should be flushed should be. Assume you set a timer of 1
> >>> seconds, and you have a data rate of 1000 records per second, with each
> >>> record arriving one ms after the other all each with different key. To
> >>> flush this data "correctly" we would need to check wall-clock time very
> >>> millisecond... Thoughts?
> >>>
> >>> (We don't need to dive into all details, but a high level discussion
> >>> about the desired algorithm and guarantees would be good to have IMHO.)
> >>
> >> I had never dug into the performance characteristics of
> >> currentTimeMillis() before:
> >>
> >> http://pzemtsov.github.io/2017/07/23/the-slow-currenttimemillis.html
> >>
> >> So if we assume the slow Linux average of 640 ns/call, at a 1000
> calls/sec
> >> that's 0.64 ms. Doesn't seem terrible to me but I imagine for some use
> >> cases 1M calls/sec might be reasonable and now we're up to 0.64s just in
> >> system time checking. Perhaps we add some logic that calculates the
> rate of
> >> data input and if it exceeds some threshold we only check the time
> every n
> >> records? The trick there I suppose is for very bursty traffic you could
> >> exceed and then wait too long to trigger another check. Maybe we store a
> >> moving average? Or perhaps this is getting too complicated?
> >>
> >>
> >>
> >
>
>

Re: [DISCUSS] KIP-424: Allow suppression of intermediate events based on wall clock time

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I agree that there are multiple ways how to avoid calling
`System.currentTimeMillis()`. However, the KIP needs to define the
public contract to explain users what behavior they can expect (the
simplest thing might be to say, it's based on `punctuation()` schedule
-- not sure if this is desired or not).

Similarly, the question about "why should we slush on shutdown" is part
of the contract and multiple ways how to design it seem possible.



-Matthias

On 3/11/19 8:30 AM, John Roesler wrote:
> Hey, all, just chiming in to keep the discussion moving...
> 
> Regarding whether to flush or not on shutdown, I'm curious why we *would*
> flush...
> The record cache does this, but that's because it's not durable. The
> suppression buffer is already backed by a changelog specifically so that it
> can provide exactly the timing you configure, and not have to emit early
> just because the commit interval is short or the task is migrated. So,
> regardless of the commit interval or application lifecycle, if I tell
> suppression to wait 5 minutes before emitting, it'll wait 5 minutes. It
> seems asymmetric for wall-clock suppression to behave differently.
> 
> Regarding checking wall-clock time, yes, it can be expensive, but there are
> a number of ways we can cope with it without introducing a complicated
> algorithm:
> * use nano time
> * check the wall-clock once per batch and set it on the processor context
> in org.apache.kafka.streams.processor.internals.StreamThread#runOnce (we
> already check system time here anyway)
> * maybe just do the naive thing and measure the overhead. I.e., maybe we
> should benchmark the implementation anyway to look for this or other
> bottlenecks, and fix performance problem in the order they appear.
> 
> Thoughts?
> 
> Thanks,
> -John
> 
> On Mon, Feb 25, 2019 at 4:36 PM jonathangordon@newrelic.com <
> jonathangordon@newrelic.com> wrote:
> 
>> On 2019/02/21 02:19:27, "Matthias J. Sax" <ma...@confluent.io> wrote:
>>> thanks for the KIP. Corner case question:
>>>
>>> What happens if an application is stopped an restarted?
>>>
>>>  - Should suppress() flush all records (would be _before_ the time
>> elapsed)?
>>>  - Or should it preserve buffered records and reload on restart? For
>>> this case, should the record be flushed on reload (elapsed time is
>>> unknown) or should we reset the timer to zero?
>>
>> My opinion is that we should aim for simplicity for the first
>> implementation of this feature: Flush all the records on shutdown. If
>> there's demand in the future for strict adherence on shutdown we can
>> implement them as extra params to Suppressed api.
>>
>>> What is unclear to me atm, is the use-case you anticipate. If you assume
>>> a live run of an applications, event-time and processing-time should be
>>> fairly identical (at least with regard to data rates). Thus, suppress()
>>> on event-time should give you about the same behavior as wall-clock
>>> time? If you disagree, can you elaborate?
>>
>> Imagine a session window where you aggregate 10K events that usually occur
>> within 2-3 seconds of each other (event time). However, they are ingested
>> in batches of 1000 or so, spread out over 2-3 minutes (ingest time), and
>> not necessarily in order. It's important for us to be able to publish this
>> aggregate in real-time as we get new data (every 10 seconds) to keep our
>> time to glass low, but our data store is non-updateable so we'd like to
>> limit the number of aggregates we publish.
>>
>> If you imagine a case where all the event batches arrive in reverse order
>> for one particular session window, then once the stream time advances past
>> the suppression threshold, we could publish an aggregate update for each
>> newly received event.
>>
>>> This leave the case for data reprocessing, for which event-time advances
>>> much faster than wall-clock time. Is this the target use-case?
>>
>> No, see above.
>>
>>> About the implementation: checking wall-clock time is an expensive
>>> system call, so I am little worried about run-time overhead. This seems
>>> not to be an implementation detail and thus, it might be worth to
>>> includes is in the discussion. The question is, how strict the guarantee
>>> when records should be flushed should be. Assume you set a timer of 1
>>> seconds, and you have a data rate of 1000 records per second, with each
>>> record arriving one ms after the other all each with different key. To
>>> flush this data "correctly" we would need to check wall-clock time very
>>> millisecond... Thoughts?
>>>
>>> (We don't need to dive into all details, but a high level discussion
>>> about the desired algorithm and guarantees would be good to have IMHO.)
>>
>> I had never dug into the performance characteristics of
>> currentTimeMillis() before:
>>
>> http://pzemtsov.github.io/2017/07/23/the-slow-currenttimemillis.html
>>
>> So if we assume the slow Linux average of 640 ns/call, at a 1000 calls/sec
>> that's 0.64 ms. Doesn't seem terrible to me but I imagine for some use
>> cases 1M calls/sec might be reasonable and now we're up to 0.64s just in
>> system time checking. Perhaps we add some logic that calculates the rate of
>> data input and if it exceeds some threshold we only check the time every n
>> records? The trick there I suppose is for very bursty traffic you could
>> exceed and then wait too long to trigger another check. Maybe we store a
>> moving average? Or perhaps this is getting too complicated?
>>
>>
>>
> 


Re: [DISCUSS] KIP-424: Allow suppression of intermediate events based on wall clock time

Posted by John Roesler <jo...@confluent.io>.
Hey, all, just chiming in to keep the discussion moving...

Regarding whether to flush or not on shutdown, I'm curious why we *would*
flush...
The record cache does this, but that's because it's not durable. The
suppression buffer is already backed by a changelog specifically so that it
can provide exactly the timing you configure, and not have to emit early
just because the commit interval is short or the task is migrated. So,
regardless of the commit interval or application lifecycle, if I tell
suppression to wait 5 minutes before emitting, it'll wait 5 minutes. It
seems asymmetric for wall-clock suppression to behave differently.

Regarding checking wall-clock time, yes, it can be expensive, but there are
a number of ways we can cope with it without introducing a complicated
algorithm:
* use nano time
* check the wall-clock once per batch and set it on the processor context
in org.apache.kafka.streams.processor.internals.StreamThread#runOnce (we
already check system time here anyway)
* maybe just do the naive thing and measure the overhead. I.e., maybe we
should benchmark the implementation anyway to look for this or other
bottlenecks, and fix performance problem in the order they appear.

Thoughts?

Thanks,
-John

On Mon, Feb 25, 2019 at 4:36 PM jonathangordon@newrelic.com <
jonathangordon@newrelic.com> wrote:

> On 2019/02/21 02:19:27, "Matthias J. Sax" <ma...@confluent.io> wrote:
> > thanks for the KIP. Corner case question:
> >
> > What happens if an application is stopped an restarted?
> >
> >  - Should suppress() flush all records (would be _before_ the time
> elapsed)?
> >  - Or should it preserve buffered records and reload on restart? For
> > this case, should the record be flushed on reload (elapsed time is
> > unknown) or should we reset the timer to zero?
>
> My opinion is that we should aim for simplicity for the first
> implementation of this feature: Flush all the records on shutdown. If
> there's demand in the future for strict adherence on shutdown we can
> implement them as extra params to Suppressed api.
>
> > What is unclear to me atm, is the use-case you anticipate. If you assume
> > a live run of an applications, event-time and processing-time should be
> > fairly identical (at least with regard to data rates). Thus, suppress()
> > on event-time should give you about the same behavior as wall-clock
> > time? If you disagree, can you elaborate?
>
> Imagine a session window where you aggregate 10K events that usually occur
> within 2-3 seconds of each other (event time). However, they are ingested
> in batches of 1000 or so, spread out over 2-3 minutes (ingest time), and
> not necessarily in order. It's important for us to be able to publish this
> aggregate in real-time as we get new data (every 10 seconds) to keep our
> time to glass low, but our data store is non-updateable so we'd like to
> limit the number of aggregates we publish.
>
> If you imagine a case where all the event batches arrive in reverse order
> for one particular session window, then once the stream time advances past
> the suppression threshold, we could publish an aggregate update for each
> newly received event.
>
> > This leave the case for data reprocessing, for which event-time advances
> > much faster than wall-clock time. Is this the target use-case?
>
> No, see above.
>
> > About the implementation: checking wall-clock time is an expensive
> > system call, so I am little worried about run-time overhead. This seems
> > not to be an implementation detail and thus, it might be worth to
> > includes is in the discussion. The question is, how strict the guarantee
> > when records should be flushed should be. Assume you set a timer of 1
> > seconds, and you have a data rate of 1000 records per second, with each
> > record arriving one ms after the other all each with different key. To
> > flush this data "correctly" we would need to check wall-clock time very
> > millisecond... Thoughts?
> >
> > (We don't need to dive into all details, but a high level discussion
> > about the desired algorithm and guarantees would be good to have IMHO.)
>
> I had never dug into the performance characteristics of
> currentTimeMillis() before:
>
> http://pzemtsov.github.io/2017/07/23/the-slow-currenttimemillis.html
>
> So if we assume the slow Linux average of 640 ns/call, at a 1000 calls/sec
> that's 0.64 ms. Doesn't seem terrible to me but I imagine for some use
> cases 1M calls/sec might be reasonable and now we're up to 0.64s just in
> system time checking. Perhaps we add some logic that calculates the rate of
> data input and if it exceeds some threshold we only check the time every n
> records? The trick there I suppose is for very bursty traffic you could
> exceed and then wait too long to trigger another check. Maybe we store a
> moving average? Or perhaps this is getting too complicated?
>
>
>

Re: [DISCUSS] KIP-424: Allow suppression of intermediate events based on wall clock time

Posted by jo...@newrelic.com, jo...@newrelic.com.
On 2019/02/21 02:19:27, "Matthias J. Sax" <ma...@confluent.io> wrote: 
> thanks for the KIP. Corner case question:
> 
> What happens if an application is stopped an restarted?
> 
>  - Should suppress() flush all records (would be _before_ the time elapsed)?
>  - Or should it preserve buffered records and reload on restart? For
> this case, should the record be flushed on reload (elapsed time is
> unknown) or should we reset the timer to zero?

My opinion is that we should aim for simplicity for the first implementation of this feature: Flush all the records on shutdown. If there's demand in the future for strict adherence on shutdown we can implement them as extra params to Suppressed api.

> What is unclear to me atm, is the use-case you anticipate. If you assume
> a live run of an applications, event-time and processing-time should be
> fairly identical (at least with regard to data rates). Thus, suppress()
> on event-time should give you about the same behavior as wall-clock
> time? If you disagree, can you elaborate?

Imagine a session window where you aggregate 10K events that usually occur within 2-3 seconds of each other (event time). However, they are ingested in batches of 1000 or so, spread out over 2-3 minutes (ingest time), and not necessarily in order. It's important for us to be able to publish this aggregate in real-time as we get new data (every 10 seconds) to keep our time to glass low, but our data store is non-updateable so we'd like to limit the number of aggregates we publish.

If you imagine a case where all the event batches arrive in reverse order for one particular session window, then once the stream time advances past the suppression threshold, we could publish an aggregate update for each newly received event.

> This leave the case for data reprocessing, for which event-time advances
> much faster than wall-clock time. Is this the target use-case?

No, see above.

> About the implementation: checking wall-clock time is an expensive
> system call, so I am little worried about run-time overhead. This seems
> not to be an implementation detail and thus, it might be worth to
> includes is in the discussion. The question is, how strict the guarantee
> when records should be flushed should be. Assume you set a timer of 1
> seconds, and you have a data rate of 1000 records per second, with each
> record arriving one ms after the other all each with different key. To
> flush this data "correctly" we would need to check wall-clock time very
> millisecond... Thoughts?
> 
> (We don't need to dive into all details, but a high level discussion
> about the desired algorithm and guarantees would be good to have IMHO.)

I had never dug into the performance characteristics of currentTimeMillis() before:

http://pzemtsov.github.io/2017/07/23/the-slow-currenttimemillis.html

So if we assume the slow Linux average of 640 ns/call, at a 1000 calls/sec that's 0.64 ms. Doesn't seem terrible to me but I imagine for some use cases 1M calls/sec might be reasonable and now we're up to 0.64s just in system time checking. Perhaps we add some logic that calculates the rate of data input and if it exceeds some threshold we only check the time every n records? The trick there I suppose is for very bursty traffic you could exceed and then wait too long to trigger another check. Maybe we store a moving average? Or perhaps this is getting too complicated?