You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Liam Clarke <li...@adscale.co.nz> on 2020/04/15 03:43:33 UTC

Kafka Streams - issues with windowing and suppress

Hi all,

I have a case where I want to consume from a topic, count the number of
certain ids in a given time period X, and emit a new record to a different
topic after that same time period X has elapsed containing the aggregated
value.

I'm using suppress with Suppressed.untilWindowCloses, but nothing is ever
emitted, nor is my peek placed after the suppress ever being hit.
My code is in the below Gist - I've hardcoded the durations for 5 seconds
after testing purposes:
https://gist.github.com/LiamClarkeNZ/24121ccf0f09e4530749cbd92633fa46

I'm assuming I've misunderstood something drastically, and would greatly
appreciate a pointer on where I may have gone wrong. I'm wondering if I
need a larger retention on the persistent store?

I understand that events have to arrive in order for windows to close, so
I've sent events after the window has expired to attempt to move the window
on, and my first peek (before the suppression) is emitting as I do:

1. 2020-04-15T03:36:48.569Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
1. 2020-04-15T03:37:11.682Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
1. 2020-04-15T03:39:18.882Z aqgzftnvyn - 1


 Any guidance greatfully appreciated.

Kind regards,

Liam Clarke

Re: Kafka Streams - issues with windowing and suppress

Posted by Liam Clarke-Hutchinson <li...@adscale.co.nz>.
Hi John,

Yep, I saw there was a few issues filed around default grace periods, and I
have a few ideas about sensible defaults and possible APIs. I'll sign up
for a Jira account in the next few days to join the discussion :)

Kind regards,

Liam Clarke-Hutchinson

On Tue, Apr 21, 2020 at 9:09 AM John Roesler <vv...@apache.org> wrote:

> Yes, thanks, Liam!
>
> By the way, There's actually already a ticket to try and improve the API,
> and the discussed solution is basically the same thing I said had never
> occurred to me before, so I'm not sure what to say about that...
>
> https://issues.apache.org/jira/browse/KAFKA-8924
>
> The ticket seems abandoned, though, so it might be up for grabs if you
> want to make sure it gets resolved asap. I'll add a comment with my
> proposal.
>
> Thanks,
> -John
>
> On Mon, Apr 20, 2020, at 01:08, Matthias J. Sax wrote:
> > Thanks for the PR!
> >
> > On 4/19/20 10:04 PM, Liam Clarke-Hutchinson wrote:
> > > PR submitted :) https://github.com/apache/kafka/pull/8520
> > >
> > > On Mon, Apr 20, 2020 at 2:34 PM John Roesler <vv...@apache.org>
> wrote:
> > >
> > >> Hi Liam,
> > >>
> > >> That sounds like a good idea to me. In fact, I’d go so far as to say
> we
> > >> should just change the existing example to include a grace period,
> and not
> > >> bother with an extra example. That would put it front and center.
> > >>
> > >> A PR would be greatly appreciated! Thanks for the offer!
> > >>
> > >> Thanks,
> > >> John
> > >>
> > >> On Sun, Apr 19, 2020, at 19:58, Liam Clarke wrote:
> > >>> Hi Matthias,
> > >>>
> > >>> I think as an interim measure, if the windowing samples in the docs
> > >> showed
> > >>> an additional example where the grace period was set (with perhaps a
> > >>> comment about the current default grace period, and planned future
> > >>> changes?) it would make it sufficiently visible - happy to submit a
> PR
> > >> with
> > >>> those changes if it seems appropriate.
> > >>>
> > >>> Cheers,
> > >>>
> > >>> Liam Clarke-Hutchinson
> > >>>
> > >>> On Mon, Apr 20, 2020 at 12:12 PM Matthias J. Sax <mj...@apache.org>
> > >> wrote:
> > >>>
> > >>>> I would prefer to not make the grace-period a mandatory argument and
> > >>>> keep the API as-is. I understand the issue of backward
> compatibility,
> > >>>> but I would still argue that we should just change the default grace
> > >>>> period to 0 in the 3.0 release. It's a major release and thus it
> seems
> > >>>> to be fine. To prepare for this change, we could start to log a WARN
> > >>>> message, if a user does not set the grace period explicitly for now.
> > >>>>
> > >>>> Just my 2 ct. Thoughts?
> > >>>>
> > >>>> -Matthias
> > >>>>
> > >>>> On 4/19/20 7:40 AM, John Roesler wrote:
> > >>>>> Oh, man, that’s a good idea.
> > >>>>>
> > >>>>> I can propose to deprecate (not remove) the existing ‘of’ factory
> > >> method
> > >>>> and add one with a mandatory grace period. Not sure why I didn’t
> think
> > >> of
> > >>>> that before. Probably too caught up in looking for something
> “smart”.
> > >>>>>
> > >>>>> Thanks!
> > >>>>> John
> > >>>>>
> > >>>>> On Sun, Apr 19, 2020, at 02:27, Liam Clarke wrote:
> > >>>>>> Hi John,
> > >>>>>>
> > >>>>>> I can't really think of a way to make it more obvious without
> > >> breaking
> > >>>>>> backwards compatibility - e.g., obvious easy fix is that grace
> > >> period
> > >>>> is a
> > >>>>>> mandatory arg to TimeWindows, but that would definitely break
> > >>>> compatibility.
> > >>>>>>
> > >>>>>> Cheers,
> > >>>>>>
> > >>>>>> Liam Clarke-Hutchinson
> > >>>>>>
> > >>>>>> On Thu, Apr 16, 2020 at 1:59 AM John Roesler <vvcephei@apache.org
> >
> > >>>> wrote:
> > >>>>>>
> > >>>>>>> Boom, you got it, Liam! Nice debugging work.
> > >>>>>>>
> > >>>>>>> This is a pretty big bummer, but I had to do it that way for
> > >>>>>>> compatibility. I added a log message to try and help reduce the
> > >> risk,
> > >>>> but
> > >>>>>>> it’s still kind of a trap.
> > >>>>>>>
> > >>>>>>> I’d like to do a KIP at some point to consider changing the
> default
> > >>>> grace
> > >>>>>>> period, but haven’t done it because it’s not clear what the
> default
> > >>>> should
> > >>>>>>> be.
> > >>>>>>>
> > >>>>>>> Please let me know if you have any ideas!
> > >>>>>>> Thanks,
> > >>>>>>> -John
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Tue, Apr 14, 2020, at 23:44, Liam Clarke wrote:
> > >>>>>>>> And the answer is to change
> > >>>>>>>> .windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
> > >>>>>>>> and specify the grace period:
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>
> > >>
> windowedBy(TimeWindows.of(Duration.ofMillis(5000)).grace(Duration.ofMillis(100)))
> > >>>>>>>>
> > >>>>>>>> On Wed, Apr 15, 2020 at 4:34 PM Liam Clarke <
> > >>>> liam.clarke@adscale.co.nz>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Okay, doing some debugging it looks like I'm seeing this
> > >> behaviour
> > >>>>>>> because
> > >>>>>>>>> it's picking up a grace duration of 86,395,000 ms in
> > >>>>>>>>> KTableImpl.buildSuppress, which would happen to be  5000 millis
> > >> (my
> > >>>>>>> window
> > >>>>>>>>> size) off 24 hours, so I've got some clues!
> > >>>>>>>>>
> > >>>>>>>>> On Wed, Apr 15, 2020 at 3:43 PM Liam Clarke <
> > >>>> liam.clarke@adscale.co.nz
> > >>>>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Hi all,
> > >>>>>>>>>>
> > >>>>>>>>>> I have a case where I want to consume from a topic, count the
> > >> number
> > >>>>>>> of
> > >>>>>>>>>> certain ids in a given time period X, and emit a new record
> to a
> > >>>>>>> different
> > >>>>>>>>>> topic after that same time period X has elapsed containing the
> > >>>>>>> aggregated
> > >>>>>>>>>> value.
> > >>>>>>>>>>
> > >>>>>>>>>> I'm using suppress with Suppressed.untilWindowCloses, but
> > >> nothing is
> > >>>>>>> ever
> > >>>>>>>>>> emitted, nor is my peek placed after the suppress ever being
> > >> hit.
> > >>>>>>>>>> My code is in the below Gist - I've hardcoded the durations
> for
> > >> 5
> > >>>>>>> seconds
> > >>>>>>>>>> after testing purposes:
> > >>>>>>>>>>
> > >>>>
> https://gist.github.com/LiamClarkeNZ/24121ccf0f09e4530749cbd92633fa46
> > >>>>>>>>>>
> > >>>>>>>>>> I'm assuming I've misunderstood something drastically, and
> would
> > >>>>>>> greatly
> > >>>>>>>>>> appreciate a pointer on where I may have gone wrong. I'm
> > >> wondering
> > >>>> if
> > >>>>>>> I
> > >>>>>>>>>> need a larger retention on the persistent store?
> > >>>>>>>>>>
> > >>>>>>>>>> I understand that events have to arrive in order for windows
> to
> > >>>>>>> close, so
> > >>>>>>>>>> I've sent events after the window has expired to attempt to
> > >> move the
> > >>>>>>> window
> > >>>>>>>>>> on, and my first peek (before the suppression) is emitting as
> I
> > >> do:
> > >>>>>>>>>>
> > >>>>>>>>>> 1. 2020-04-15T03:36:48.569Z
> > >> e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> > >>>>>>>>>> 1. 2020-04-15T03:37:11.682Z
> > >> e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> > >>>>>>>>>> 1. 2020-04-15T03:39:18.882Z aqgzftnvyn - 1
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>  Any guidance greatfully appreciated.
> > >>>>>>>>>>
> > >>>>>>>>>> Kind regards,
> > >>>>>>>>>>
> > >>>>>>>>>> Liam Clarke
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >
> >
> >
> > Attachments:
> > * signature.asc
>

Re: Kafka Streams - issues with windowing and suppress

Posted by John Roesler <vv...@apache.org>.
Yes, thanks, Liam!

By the way, There's actually already a ticket to try and improve the API, 
and the discussed solution is basically the same thing I said had never
occurred to me before, so I'm not sure what to say about that...

https://issues.apache.org/jira/browse/KAFKA-8924

The ticket seems abandoned, though, so it might be up for grabs if you
want to make sure it gets resolved asap. I'll add a comment with my
proposal.

Thanks,
-John

On Mon, Apr 20, 2020, at 01:08, Matthias J. Sax wrote:
> Thanks for the PR!
> 
> On 4/19/20 10:04 PM, Liam Clarke-Hutchinson wrote:
> > PR submitted :) https://github.com/apache/kafka/pull/8520
> > 
> > On Mon, Apr 20, 2020 at 2:34 PM John Roesler <vv...@apache.org> wrote:
> > 
> >> Hi Liam,
> >>
> >> That sounds like a good idea to me. In fact, I’d go so far as to say we
> >> should just change the existing example to include a grace period, and not
> >> bother with an extra example. That would put it front and center.
> >>
> >> A PR would be greatly appreciated! Thanks for the offer!
> >>
> >> Thanks,
> >> John
> >>
> >> On Sun, Apr 19, 2020, at 19:58, Liam Clarke wrote:
> >>> Hi Matthias,
> >>>
> >>> I think as an interim measure, if the windowing samples in the docs
> >> showed
> >>> an additional example where the grace period was set (with perhaps a
> >>> comment about the current default grace period, and planned future
> >>> changes?) it would make it sufficiently visible - happy to submit a PR
> >> with
> >>> those changes if it seems appropriate.
> >>>
> >>> Cheers,
> >>>
> >>> Liam Clarke-Hutchinson
> >>>
> >>> On Mon, Apr 20, 2020 at 12:12 PM Matthias J. Sax <mj...@apache.org>
> >> wrote:
> >>>
> >>>> I would prefer to not make the grace-period a mandatory argument and
> >>>> keep the API as-is. I understand the issue of backward compatibility,
> >>>> but I would still argue that we should just change the default grace
> >>>> period to 0 in the 3.0 release. It's a major release and thus it seems
> >>>> to be fine. To prepare for this change, we could start to log a WARN
> >>>> message, if a user does not set the grace period explicitly for now.
> >>>>
> >>>> Just my 2 ct. Thoughts?
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 4/19/20 7:40 AM, John Roesler wrote:
> >>>>> Oh, man, that’s a good idea.
> >>>>>
> >>>>> I can propose to deprecate (not remove) the existing ‘of’ factory
> >> method
> >>>> and add one with a mandatory grace period. Not sure why I didn’t think
> >> of
> >>>> that before. Probably too caught up in looking for something “smart”.
> >>>>>
> >>>>> Thanks!
> >>>>> John
> >>>>>
> >>>>> On Sun, Apr 19, 2020, at 02:27, Liam Clarke wrote:
> >>>>>> Hi John,
> >>>>>>
> >>>>>> I can't really think of a way to make it more obvious without
> >> breaking
> >>>>>> backwards compatibility - e.g., obvious easy fix is that grace
> >> period
> >>>> is a
> >>>>>> mandatory arg to TimeWindows, but that would definitely break
> >>>> compatibility.
> >>>>>>
> >>>>>> Cheers,
> >>>>>>
> >>>>>> Liam Clarke-Hutchinson
> >>>>>>
> >>>>>> On Thu, Apr 16, 2020 at 1:59 AM John Roesler <vv...@apache.org>
> >>>> wrote:
> >>>>>>
> >>>>>>> Boom, you got it, Liam! Nice debugging work.
> >>>>>>>
> >>>>>>> This is a pretty big bummer, but I had to do it that way for
> >>>>>>> compatibility. I added a log message to try and help reduce the
> >> risk,
> >>>> but
> >>>>>>> it’s still kind of a trap.
> >>>>>>>
> >>>>>>> I’d like to do a KIP at some point to consider changing the default
> >>>> grace
> >>>>>>> period, but haven’t done it because it’s not clear what the default
> >>>> should
> >>>>>>> be.
> >>>>>>>
> >>>>>>> Please let me know if you have any ideas!
> >>>>>>> Thanks,
> >>>>>>> -John
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Apr 14, 2020, at 23:44, Liam Clarke wrote:
> >>>>>>>> And the answer is to change
> >>>>>>>> .windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
> >>>>>>>> and specify the grace period:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>
> >> windowedBy(TimeWindows.of(Duration.ofMillis(5000)).grace(Duration.ofMillis(100)))
> >>>>>>>>
> >>>>>>>> On Wed, Apr 15, 2020 at 4:34 PM Liam Clarke <
> >>>> liam.clarke@adscale.co.nz>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Okay, doing some debugging it looks like I'm seeing this
> >> behaviour
> >>>>>>> because
> >>>>>>>>> it's picking up a grace duration of 86,395,000 ms in
> >>>>>>>>> KTableImpl.buildSuppress, which would happen to be  5000 millis
> >> (my
> >>>>>>> window
> >>>>>>>>> size) off 24 hours, so I've got some clues!
> >>>>>>>>>
> >>>>>>>>> On Wed, Apr 15, 2020 at 3:43 PM Liam Clarke <
> >>>> liam.clarke@adscale.co.nz
> >>>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi all,
> >>>>>>>>>>
> >>>>>>>>>> I have a case where I want to consume from a topic, count the
> >> number
> >>>>>>> of
> >>>>>>>>>> certain ids in a given time period X, and emit a new record to a
> >>>>>>> different
> >>>>>>>>>> topic after that same time period X has elapsed containing the
> >>>>>>> aggregated
> >>>>>>>>>> value.
> >>>>>>>>>>
> >>>>>>>>>> I'm using suppress with Suppressed.untilWindowCloses, but
> >> nothing is
> >>>>>>> ever
> >>>>>>>>>> emitted, nor is my peek placed after the suppress ever being
> >> hit.
> >>>>>>>>>> My code is in the below Gist - I've hardcoded the durations for
> >> 5
> >>>>>>> seconds
> >>>>>>>>>> after testing purposes:
> >>>>>>>>>>
> >>>> https://gist.github.com/LiamClarkeNZ/24121ccf0f09e4530749cbd92633fa46
> >>>>>>>>>>
> >>>>>>>>>> I'm assuming I've misunderstood something drastically, and would
> >>>>>>> greatly
> >>>>>>>>>> appreciate a pointer on where I may have gone wrong. I'm
> >> wondering
> >>>> if
> >>>>>>> I
> >>>>>>>>>> need a larger retention on the persistent store?
> >>>>>>>>>>
> >>>>>>>>>> I understand that events have to arrive in order for windows to
> >>>>>>> close, so
> >>>>>>>>>> I've sent events after the window has expired to attempt to
> >> move the
> >>>>>>> window
> >>>>>>>>>> on, and my first peek (before the suppression) is emitting as I
> >> do:
> >>>>>>>>>>
> >>>>>>>>>> 1. 2020-04-15T03:36:48.569Z
> >> e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> >>>>>>>>>> 1. 2020-04-15T03:37:11.682Z
> >> e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> >>>>>>>>>> 1. 2020-04-15T03:39:18.882Z aqgzftnvyn - 1
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>  Any guidance greatfully appreciated.
> >>>>>>>>>>
> >>>>>>>>>> Kind regards,
> >>>>>>>>>>
> >>>>>>>>>> Liam Clarke
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>>
> >>>
> >>
> > 
> 
> 
> Attachments:
> * signature.asc

Re: Kafka Streams - issues with windowing and suppress

Posted by "Matthias J. Sax" <mj...@apache.org>.
Thanks for the PR!

On 4/19/20 10:04 PM, Liam Clarke-Hutchinson wrote:
> PR submitted :) https://github.com/apache/kafka/pull/8520
> 
> On Mon, Apr 20, 2020 at 2:34 PM John Roesler <vv...@apache.org> wrote:
> 
>> Hi Liam,
>>
>> That sounds like a good idea to me. In fact, I’d go so far as to say we
>> should just change the existing example to include a grace period, and not
>> bother with an extra example. That would put it front and center.
>>
>> A PR would be greatly appreciated! Thanks for the offer!
>>
>> Thanks,
>> John
>>
>> On Sun, Apr 19, 2020, at 19:58, Liam Clarke wrote:
>>> Hi Matthias,
>>>
>>> I think as an interim measure, if the windowing samples in the docs
>> showed
>>> an additional example where the grace period was set (with perhaps a
>>> comment about the current default grace period, and planned future
>>> changes?) it would make it sufficiently visible - happy to submit a PR
>> with
>>> those changes if it seems appropriate.
>>>
>>> Cheers,
>>>
>>> Liam Clarke-Hutchinson
>>>
>>> On Mon, Apr 20, 2020 at 12:12 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>>>
>>>> I would prefer to not make the grace-period a mandatory argument and
>>>> keep the API as-is. I understand the issue of backward compatibility,
>>>> but I would still argue that we should just change the default grace
>>>> period to 0 in the 3.0 release. It's a major release and thus it seems
>>>> to be fine. To prepare for this change, we could start to log a WARN
>>>> message, if a user does not set the grace period explicitly for now.
>>>>
>>>> Just my 2 ct. Thoughts?
>>>>
>>>> -Matthias
>>>>
>>>> On 4/19/20 7:40 AM, John Roesler wrote:
>>>>> Oh, man, that’s a good idea.
>>>>>
>>>>> I can propose to deprecate (not remove) the existing ‘of’ factory
>> method
>>>> and add one with a mandatory grace period. Not sure why I didn’t think
>> of
>>>> that before. Probably too caught up in looking for something “smart”.
>>>>>
>>>>> Thanks!
>>>>> John
>>>>>
>>>>> On Sun, Apr 19, 2020, at 02:27, Liam Clarke wrote:
>>>>>> Hi John,
>>>>>>
>>>>>> I can't really think of a way to make it more obvious without
>> breaking
>>>>>> backwards compatibility - e.g., obvious easy fix is that grace
>> period
>>>> is a
>>>>>> mandatory arg to TimeWindows, but that would definitely break
>>>> compatibility.
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Liam Clarke-Hutchinson
>>>>>>
>>>>>> On Thu, Apr 16, 2020 at 1:59 AM John Roesler <vv...@apache.org>
>>>> wrote:
>>>>>>
>>>>>>> Boom, you got it, Liam! Nice debugging work.
>>>>>>>
>>>>>>> This is a pretty big bummer, but I had to do it that way for
>>>>>>> compatibility. I added a log message to try and help reduce the
>> risk,
>>>> but
>>>>>>> it’s still kind of a trap.
>>>>>>>
>>>>>>> I’d like to do a KIP at some point to consider changing the default
>>>> grace
>>>>>>> period, but haven’t done it because it’s not clear what the default
>>>> should
>>>>>>> be.
>>>>>>>
>>>>>>> Please let me know if you have any ideas!
>>>>>>> Thanks,
>>>>>>> -John
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Apr 14, 2020, at 23:44, Liam Clarke wrote:
>>>>>>>> And the answer is to change
>>>>>>>> .windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
>>>>>>>> and specify the grace period:
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>
>> windowedBy(TimeWindows.of(Duration.ofMillis(5000)).grace(Duration.ofMillis(100)))
>>>>>>>>
>>>>>>>> On Wed, Apr 15, 2020 at 4:34 PM Liam Clarke <
>>>> liam.clarke@adscale.co.nz>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Okay, doing some debugging it looks like I'm seeing this
>> behaviour
>>>>>>> because
>>>>>>>>> it's picking up a grace duration of 86,395,000 ms in
>>>>>>>>> KTableImpl.buildSuppress, which would happen to be  5000 millis
>> (my
>>>>>>> window
>>>>>>>>> size) off 24 hours, so I've got some clues!
>>>>>>>>>
>>>>>>>>> On Wed, Apr 15, 2020 at 3:43 PM Liam Clarke <
>>>> liam.clarke@adscale.co.nz
>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> I have a case where I want to consume from a topic, count the
>> number
>>>>>>> of
>>>>>>>>>> certain ids in a given time period X, and emit a new record to a
>>>>>>> different
>>>>>>>>>> topic after that same time period X has elapsed containing the
>>>>>>> aggregated
>>>>>>>>>> value.
>>>>>>>>>>
>>>>>>>>>> I'm using suppress with Suppressed.untilWindowCloses, but
>> nothing is
>>>>>>> ever
>>>>>>>>>> emitted, nor is my peek placed after the suppress ever being
>> hit.
>>>>>>>>>> My code is in the below Gist - I've hardcoded the durations for
>> 5
>>>>>>> seconds
>>>>>>>>>> after testing purposes:
>>>>>>>>>>
>>>> https://gist.github.com/LiamClarkeNZ/24121ccf0f09e4530749cbd92633fa46
>>>>>>>>>>
>>>>>>>>>> I'm assuming I've misunderstood something drastically, and would
>>>>>>> greatly
>>>>>>>>>> appreciate a pointer on where I may have gone wrong. I'm
>> wondering
>>>> if
>>>>>>> I
>>>>>>>>>> need a larger retention on the persistent store?
>>>>>>>>>>
>>>>>>>>>> I understand that events have to arrive in order for windows to
>>>>>>> close, so
>>>>>>>>>> I've sent events after the window has expired to attempt to
>> move the
>>>>>>> window
>>>>>>>>>> on, and my first peek (before the suppression) is emitting as I
>> do:
>>>>>>>>>>
>>>>>>>>>> 1. 2020-04-15T03:36:48.569Z
>> e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
>>>>>>>>>> 1. 2020-04-15T03:37:11.682Z
>> e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
>>>>>>>>>> 1. 2020-04-15T03:39:18.882Z aqgzftnvyn - 1
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>  Any guidance greatfully appreciated.
>>>>>>>>>>
>>>>>>>>>> Kind regards,
>>>>>>>>>>
>>>>>>>>>> Liam Clarke
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>>
>>>
>>
> 


Re: Kafka Streams - issues with windowing and suppress

Posted by Liam Clarke-Hutchinson <li...@adscale.co.nz>.
PR submitted :) https://github.com/apache/kafka/pull/8520

On Mon, Apr 20, 2020 at 2:34 PM John Roesler <vv...@apache.org> wrote:

> Hi Liam,
>
> That sounds like a good idea to me. In fact, I’d go so far as to say we
> should just change the existing example to include a grace period, and not
> bother with an extra example. That would put it front and center.
>
> A PR would be greatly appreciated! Thanks for the offer!
>
> Thanks,
> John
>
> On Sun, Apr 19, 2020, at 19:58, Liam Clarke wrote:
> > Hi Matthias,
> >
> > I think as an interim measure, if the windowing samples in the docs
> showed
> > an additional example where the grace period was set (with perhaps a
> > comment about the current default grace period, and planned future
> > changes?) it would make it sufficiently visible - happy to submit a PR
> with
> > those changes if it seems appropriate.
> >
> > Cheers,
> >
> > Liam Clarke-Hutchinson
> >
> > On Mon, Apr 20, 2020 at 12:12 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> > > I would prefer to not make the grace-period a mandatory argument and
> > > keep the API as-is. I understand the issue of backward compatibility,
> > > but I would still argue that we should just change the default grace
> > > period to 0 in the 3.0 release. It's a major release and thus it seems
> > > to be fine. To prepare for this change, we could start to log a WARN
> > > message, if a user does not set the grace period explicitly for now.
> > >
> > > Just my 2 ct. Thoughts?
> > >
> > > -Matthias
> > >
> > > On 4/19/20 7:40 AM, John Roesler wrote:
> > > > Oh, man, that’s a good idea.
> > > >
> > > > I can propose to deprecate (not remove) the existing ‘of’ factory
> method
> > > and add one with a mandatory grace period. Not sure why I didn’t think
> of
> > > that before. Probably too caught up in looking for something “smart”.
> > > >
> > > > Thanks!
> > > > John
> > > >
> > > > On Sun, Apr 19, 2020, at 02:27, Liam Clarke wrote:
> > > >> Hi John,
> > > >>
> > > >> I can't really think of a way to make it more obvious without
> breaking
> > > >> backwards compatibility - e.g., obvious easy fix is that grace
> period
> > > is a
> > > >> mandatory arg to TimeWindows, but that would definitely break
> > > compatibility.
> > > >>
> > > >> Cheers,
> > > >>
> > > >> Liam Clarke-Hutchinson
> > > >>
> > > >> On Thu, Apr 16, 2020 at 1:59 AM John Roesler <vv...@apache.org>
> > > wrote:
> > > >>
> > > >>> Boom, you got it, Liam! Nice debugging work.
> > > >>>
> > > >>> This is a pretty big bummer, but I had to do it that way for
> > > >>> compatibility. I added a log message to try and help reduce the
> risk,
> > > but
> > > >>> it’s still kind of a trap.
> > > >>>
> > > >>> I’d like to do a KIP at some point to consider changing the default
> > > grace
> > > >>> period, but haven’t done it because it’s not clear what the default
> > > should
> > > >>> be.
> > > >>>
> > > >>> Please let me know if you have any ideas!
> > > >>> Thanks,
> > > >>> -John
> > > >>>
> > > >>>
> > > >>> On Tue, Apr 14, 2020, at 23:44, Liam Clarke wrote:
> > > >>>> And the answer is to change
> > > >>>> .windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
> > > >>>> and specify the grace period:
> > > >>>>
> > > >>>>
> > > >>>
> > >
> windowedBy(TimeWindows.of(Duration.ofMillis(5000)).grace(Duration.ofMillis(100)))
> > > >>>>
> > > >>>> On Wed, Apr 15, 2020 at 4:34 PM Liam Clarke <
> > > liam.clarke@adscale.co.nz>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Okay, doing some debugging it looks like I'm seeing this
> behaviour
> > > >>> because
> > > >>>>> it's picking up a grace duration of 86,395,000 ms in
> > > >>>>> KTableImpl.buildSuppress, which would happen to be  5000 millis
> (my
> > > >>> window
> > > >>>>> size) off 24 hours, so I've got some clues!
> > > >>>>>
> > > >>>>> On Wed, Apr 15, 2020 at 3:43 PM Liam Clarke <
> > > liam.clarke@adscale.co.nz
> > > >>>>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> Hi all,
> > > >>>>>>
> > > >>>>>> I have a case where I want to consume from a topic, count the
> number
> > > >>> of
> > > >>>>>> certain ids in a given time period X, and emit a new record to a
> > > >>> different
> > > >>>>>> topic after that same time period X has elapsed containing the
> > > >>> aggregated
> > > >>>>>> value.
> > > >>>>>>
> > > >>>>>> I'm using suppress with Suppressed.untilWindowCloses, but
> nothing is
> > > >>> ever
> > > >>>>>> emitted, nor is my peek placed after the suppress ever being
> hit.
> > > >>>>>> My code is in the below Gist - I've hardcoded the durations for
> 5
> > > >>> seconds
> > > >>>>>> after testing purposes:
> > > >>>>>>
> > > https://gist.github.com/LiamClarkeNZ/24121ccf0f09e4530749cbd92633fa46
> > > >>>>>>
> > > >>>>>> I'm assuming I've misunderstood something drastically, and would
> > > >>> greatly
> > > >>>>>> appreciate a pointer on where I may have gone wrong. I'm
> wondering
> > > if
> > > >>> I
> > > >>>>>> need a larger retention on the persistent store?
> > > >>>>>>
> > > >>>>>> I understand that events have to arrive in order for windows to
> > > >>> close, so
> > > >>>>>> I've sent events after the window has expired to attempt to
> move the
> > > >>> window
> > > >>>>>> on, and my first peek (before the suppression) is emitting as I
> do:
> > > >>>>>>
> > > >>>>>> 1. 2020-04-15T03:36:48.569Z
> e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> > > >>>>>> 1. 2020-04-15T03:37:11.682Z
> e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> > > >>>>>> 1. 2020-04-15T03:39:18.882Z aqgzftnvyn - 1
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>  Any guidance greatfully appreciated.
> > > >>>>>>
> > > >>>>>> Kind regards,
> > > >>>>>>
> > > >>>>>> Liam Clarke
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> > >
> >
>

Re: Kafka Streams - issues with windowing and suppress

Posted by John Roesler <vv...@apache.org>.
Hi Liam,

That sounds like a good idea to me. In fact, I’d go so far as to say we should just change the existing example to include a grace period, and not bother with an extra example. That would put it front and center. 

A PR would be greatly appreciated! Thanks for the offer!

Thanks,
John

On Sun, Apr 19, 2020, at 19:58, Liam Clarke wrote:
> Hi Matthias,
> 
> I think as an interim measure, if the windowing samples in the docs showed
> an additional example where the grace period was set (with perhaps a
> comment about the current default grace period, and planned future
> changes?) it would make it sufficiently visible - happy to submit a PR with
> those changes if it seems appropriate.
> 
> Cheers,
> 
> Liam Clarke-Hutchinson
> 
> On Mon, Apr 20, 2020 at 12:12 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
> > I would prefer to not make the grace-period a mandatory argument and
> > keep the API as-is. I understand the issue of backward compatibility,
> > but I would still argue that we should just change the default grace
> > period to 0 in the 3.0 release. It's a major release and thus it seems
> > to be fine. To prepare for this change, we could start to log a WARN
> > message, if a user does not set the grace period explicitly for now.
> >
> > Just my 2 ct. Thoughts?
> >
> > -Matthias
> >
> > On 4/19/20 7:40 AM, John Roesler wrote:
> > > Oh, man, that’s a good idea.
> > >
> > > I can propose to deprecate (not remove) the existing ‘of’ factory method
> > and add one with a mandatory grace period. Not sure why I didn’t think of
> > that before. Probably too caught up in looking for something “smart”.
> > >
> > > Thanks!
> > > John
> > >
> > > On Sun, Apr 19, 2020, at 02:27, Liam Clarke wrote:
> > >> Hi John,
> > >>
> > >> I can't really think of a way to make it more obvious without breaking
> > >> backwards compatibility - e.g., obvious easy fix is that grace period
> > is a
> > >> mandatory arg to TimeWindows, but that would definitely break
> > compatibility.
> > >>
> > >> Cheers,
> > >>
> > >> Liam Clarke-Hutchinson
> > >>
> > >> On Thu, Apr 16, 2020 at 1:59 AM John Roesler <vv...@apache.org>
> > wrote:
> > >>
> > >>> Boom, you got it, Liam! Nice debugging work.
> > >>>
> > >>> This is a pretty big bummer, but I had to do it that way for
> > >>> compatibility. I added a log message to try and help reduce the risk,
> > but
> > >>> it’s still kind of a trap.
> > >>>
> > >>> I’d like to do a KIP at some point to consider changing the default
> > grace
> > >>> period, but haven’t done it because it’s not clear what the default
> > should
> > >>> be.
> > >>>
> > >>> Please let me know if you have any ideas!
> > >>> Thanks,
> > >>> -John
> > >>>
> > >>>
> > >>> On Tue, Apr 14, 2020, at 23:44, Liam Clarke wrote:
> > >>>> And the answer is to change
> > >>>> .windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
> > >>>> and specify the grace period:
> > >>>>
> > >>>>
> > >>>
> > windowedBy(TimeWindows.of(Duration.ofMillis(5000)).grace(Duration.ofMillis(100)))
> > >>>>
> > >>>> On Wed, Apr 15, 2020 at 4:34 PM Liam Clarke <
> > liam.clarke@adscale.co.nz>
> > >>>> wrote:
> > >>>>
> > >>>>> Okay, doing some debugging it looks like I'm seeing this behaviour
> > >>> because
> > >>>>> it's picking up a grace duration of 86,395,000 ms in
> > >>>>> KTableImpl.buildSuppress, which would happen to be  5000 millis (my
> > >>> window
> > >>>>> size) off 24 hours, so I've got some clues!
> > >>>>>
> > >>>>> On Wed, Apr 15, 2020 at 3:43 PM Liam Clarke <
> > liam.clarke@adscale.co.nz
> > >>>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hi all,
> > >>>>>>
> > >>>>>> I have a case where I want to consume from a topic, count the number
> > >>> of
> > >>>>>> certain ids in a given time period X, and emit a new record to a
> > >>> different
> > >>>>>> topic after that same time period X has elapsed containing the
> > >>> aggregated
> > >>>>>> value.
> > >>>>>>
> > >>>>>> I'm using suppress with Suppressed.untilWindowCloses, but nothing is
> > >>> ever
> > >>>>>> emitted, nor is my peek placed after the suppress ever being hit.
> > >>>>>> My code is in the below Gist - I've hardcoded the durations for 5
> > >>> seconds
> > >>>>>> after testing purposes:
> > >>>>>>
> > https://gist.github.com/LiamClarkeNZ/24121ccf0f09e4530749cbd92633fa46
> > >>>>>>
> > >>>>>> I'm assuming I've misunderstood something drastically, and would
> > >>> greatly
> > >>>>>> appreciate a pointer on where I may have gone wrong. I'm wondering
> > if
> > >>> I
> > >>>>>> need a larger retention on the persistent store?
> > >>>>>>
> > >>>>>> I understand that events have to arrive in order for windows to
> > >>> close, so
> > >>>>>> I've sent events after the window has expired to attempt to move the
> > >>> window
> > >>>>>> on, and my first peek (before the suppression) is emitting as I do:
> > >>>>>>
> > >>>>>> 1. 2020-04-15T03:36:48.569Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> > >>>>>> 1. 2020-04-15T03:37:11.682Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> > >>>>>> 1. 2020-04-15T03:39:18.882Z aqgzftnvyn - 1
> > >>>>>>
> > >>>>>>
> > >>>>>>  Any guidance greatfully appreciated.
> > >>>>>>
> > >>>>>> Kind regards,
> > >>>>>>
> > >>>>>> Liam Clarke
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> >
>

Re: Kafka Streams - issues with windowing and suppress

Posted by Liam Clarke <li...@adscale.co.nz>.
Hi Matthias,

I think as an interim measure, if the windowing samples in the docs showed
an additional example where the grace period was set (with perhaps a
comment about the current default grace period, and planned future
changes?) it would make it sufficiently visible - happy to submit a PR with
those changes if it seems appropriate.

Cheers,

Liam Clarke-Hutchinson

On Mon, Apr 20, 2020 at 12:12 PM Matthias J. Sax <mj...@apache.org> wrote:

> I would prefer to not make the grace-period a mandatory argument and
> keep the API as-is. I understand the issue of backward compatibility,
> but I would still argue that we should just change the default grace
> period to 0 in the 3.0 release. It's a major release and thus it seems
> to be fine. To prepare for this change, we could start to log a WARN
> message, if a user does not set the grace period explicitly for now.
>
> Just my 2 ct. Thoughts?
>
> -Matthias
>
> On 4/19/20 7:40 AM, John Roesler wrote:
> > Oh, man, that’s a good idea.
> >
> > I can propose to deprecate (not remove) the existing ‘of’ factory method
> and add one with a mandatory grace period. Not sure why I didn’t think of
> that before. Probably too caught up in looking for something “smart”.
> >
> > Thanks!
> > John
> >
> > On Sun, Apr 19, 2020, at 02:27, Liam Clarke wrote:
> >> Hi John,
> >>
> >> I can't really think of a way to make it more obvious without breaking
> >> backwards compatibility - e.g., obvious easy fix is that grace period
> is a
> >> mandatory arg to TimeWindows, but that would definitely break
> compatibility.
> >>
> >> Cheers,
> >>
> >> Liam Clarke-Hutchinson
> >>
> >> On Thu, Apr 16, 2020 at 1:59 AM John Roesler <vv...@apache.org>
> wrote:
> >>
> >>> Boom, you got it, Liam! Nice debugging work.
> >>>
> >>> This is a pretty big bummer, but I had to do it that way for
> >>> compatibility. I added a log message to try and help reduce the risk,
> but
> >>> it’s still kind of a trap.
> >>>
> >>> I’d like to do a KIP at some point to consider changing the default
> grace
> >>> period, but haven’t done it because it’s not clear what the default
> should
> >>> be.
> >>>
> >>> Please let me know if you have any ideas!
> >>> Thanks,
> >>> -John
> >>>
> >>>
> >>> On Tue, Apr 14, 2020, at 23:44, Liam Clarke wrote:
> >>>> And the answer is to change
> >>>> .windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
> >>>> and specify the grace period:
> >>>>
> >>>>
> >>>
> windowedBy(TimeWindows.of(Duration.ofMillis(5000)).grace(Duration.ofMillis(100)))
> >>>>
> >>>> On Wed, Apr 15, 2020 at 4:34 PM Liam Clarke <
> liam.clarke@adscale.co.nz>
> >>>> wrote:
> >>>>
> >>>>> Okay, doing some debugging it looks like I'm seeing this behaviour
> >>> because
> >>>>> it's picking up a grace duration of 86,395,000 ms in
> >>>>> KTableImpl.buildSuppress, which would happen to be  5000 millis (my
> >>> window
> >>>>> size) off 24 hours, so I've got some clues!
> >>>>>
> >>>>> On Wed, Apr 15, 2020 at 3:43 PM Liam Clarke <
> liam.clarke@adscale.co.nz
> >>>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> I have a case where I want to consume from a topic, count the number
> >>> of
> >>>>>> certain ids in a given time period X, and emit a new record to a
> >>> different
> >>>>>> topic after that same time period X has elapsed containing the
> >>> aggregated
> >>>>>> value.
> >>>>>>
> >>>>>> I'm using suppress with Suppressed.untilWindowCloses, but nothing is
> >>> ever
> >>>>>> emitted, nor is my peek placed after the suppress ever being hit.
> >>>>>> My code is in the below Gist - I've hardcoded the durations for 5
> >>> seconds
> >>>>>> after testing purposes:
> >>>>>>
> https://gist.github.com/LiamClarkeNZ/24121ccf0f09e4530749cbd92633fa46
> >>>>>>
> >>>>>> I'm assuming I've misunderstood something drastically, and would
> >>> greatly
> >>>>>> appreciate a pointer on where I may have gone wrong. I'm wondering
> if
> >>> I
> >>>>>> need a larger retention on the persistent store?
> >>>>>>
> >>>>>> I understand that events have to arrive in order for windows to
> >>> close, so
> >>>>>> I've sent events after the window has expired to attempt to move the
> >>> window
> >>>>>> on, and my first peek (before the suppression) is emitting as I do:
> >>>>>>
> >>>>>> 1. 2020-04-15T03:36:48.569Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> >>>>>> 1. 2020-04-15T03:37:11.682Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> >>>>>> 1. 2020-04-15T03:39:18.882Z aqgzftnvyn - 1
> >>>>>>
> >>>>>>
> >>>>>>  Any guidance greatfully appreciated.
> >>>>>>
> >>>>>> Kind regards,
> >>>>>>
> >>>>>> Liam Clarke
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>

Re: Kafka Streams - issues with windowing and suppress

Posted by "Matthias J. Sax" <mj...@apache.org>.
I would prefer to not make the grace-period a mandatory argument and
keep the API as-is. I understand the issue of backward compatibility,
but I would still argue that we should just change the default grace
period to 0 in the 3.0 release. It's a major release and thus it seems
to be fine. To prepare for this change, we could start to log a WARN
message, if a user does not set the grace period explicitly for now.

Just my 2 ct. Thoughts?

-Matthias

On 4/19/20 7:40 AM, John Roesler wrote:
> Oh, man, that’s a good idea.
> 
> I can propose to deprecate (not remove) the existing ‘of’ factory method and add one with a mandatory grace period. Not sure why I didn’t think of that before. Probably too caught up in looking for something “smart”.
> 
> Thanks!
> John
> 
> On Sun, Apr 19, 2020, at 02:27, Liam Clarke wrote:
>> Hi John,
>>
>> I can't really think of a way to make it more obvious without breaking
>> backwards compatibility - e.g., obvious easy fix is that grace period is a
>> mandatory arg to TimeWindows, but that would definitely break compatibility.
>>
>> Cheers,
>>
>> Liam Clarke-Hutchinson
>>
>> On Thu, Apr 16, 2020 at 1:59 AM John Roesler <vv...@apache.org> wrote:
>>
>>> Boom, you got it, Liam! Nice debugging work.
>>>
>>> This is a pretty big bummer, but I had to do it that way for
>>> compatibility. I added a log message to try and help reduce the risk, but
>>> it’s still kind of a trap.
>>>
>>> I’d like to do a KIP at some point to consider changing the default grace
>>> period, but haven’t done it because it’s not clear what the default should
>>> be.
>>>
>>> Please let me know if you have any ideas!
>>> Thanks,
>>> -John
>>>
>>>
>>> On Tue, Apr 14, 2020, at 23:44, Liam Clarke wrote:
>>>> And the answer is to change
>>>> .windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
>>>> and specify the grace period:
>>>>
>>>>
>>> windowedBy(TimeWindows.of(Duration.ofMillis(5000)).grace(Duration.ofMillis(100)))
>>>>
>>>> On Wed, Apr 15, 2020 at 4:34 PM Liam Clarke <li...@adscale.co.nz>
>>>> wrote:
>>>>
>>>>> Okay, doing some debugging it looks like I'm seeing this behaviour
>>> because
>>>>> it's picking up a grace duration of 86,395,000 ms in
>>>>> KTableImpl.buildSuppress, which would happen to be  5000 millis (my
>>> window
>>>>> size) off 24 hours, so I've got some clues!
>>>>>
>>>>> On Wed, Apr 15, 2020 at 3:43 PM Liam Clarke <liam.clarke@adscale.co.nz
>>>>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I have a case where I want to consume from a topic, count the number
>>> of
>>>>>> certain ids in a given time period X, and emit a new record to a
>>> different
>>>>>> topic after that same time period X has elapsed containing the
>>> aggregated
>>>>>> value.
>>>>>>
>>>>>> I'm using suppress with Suppressed.untilWindowCloses, but nothing is
>>> ever
>>>>>> emitted, nor is my peek placed after the suppress ever being hit.
>>>>>> My code is in the below Gist - I've hardcoded the durations for 5
>>> seconds
>>>>>> after testing purposes:
>>>>>> https://gist.github.com/LiamClarkeNZ/24121ccf0f09e4530749cbd92633fa46
>>>>>>
>>>>>> I'm assuming I've misunderstood something drastically, and would
>>> greatly
>>>>>> appreciate a pointer on where I may have gone wrong. I'm wondering if
>>> I
>>>>>> need a larger retention on the persistent store?
>>>>>>
>>>>>> I understand that events have to arrive in order for windows to
>>> close, so
>>>>>> I've sent events after the window has expired to attempt to move the
>>> window
>>>>>> on, and my first peek (before the suppression) is emitting as I do:
>>>>>>
>>>>>> 1. 2020-04-15T03:36:48.569Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
>>>>>> 1. 2020-04-15T03:37:11.682Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
>>>>>> 1. 2020-04-15T03:39:18.882Z aqgzftnvyn - 1
>>>>>>
>>>>>>
>>>>>>  Any guidance greatfully appreciated.
>>>>>>
>>>>>> Kind regards,
>>>>>>
>>>>>> Liam Clarke
>>>>>>
>>>>>
>>>>
>>>
>>


Re: Kafka Streams - issues with windowing and suppress

Posted by John Roesler <vv...@apache.org>.
Oh, man, that’s a good idea.

I can propose to deprecate (not remove) the existing ‘of’ factory method and add one with a mandatory grace period. Not sure why I didn’t think of that before. Probably too caught up in looking for something “smart”.

Thanks!
John

On Sun, Apr 19, 2020, at 02:27, Liam Clarke wrote:
> Hi John,
> 
> I can't really think of a way to make it more obvious without breaking
> backwards compatibility - e.g., obvious easy fix is that grace period is a
> mandatory arg to TimeWindows, but that would definitely break compatibility.
> 
> Cheers,
> 
> Liam Clarke-Hutchinson
> 
> On Thu, Apr 16, 2020 at 1:59 AM John Roesler <vv...@apache.org> wrote:
> 
> > Boom, you got it, Liam! Nice debugging work.
> >
> > This is a pretty big bummer, but I had to do it that way for
> > compatibility. I added a log message to try and help reduce the risk, but
> > it’s still kind of a trap.
> >
> > I’d like to do a KIP at some point to consider changing the default grace
> > period, but haven’t done it because it’s not clear what the default should
> > be.
> >
> > Please let me know if you have any ideas!
> > Thanks,
> > -John
> >
> >
> > On Tue, Apr 14, 2020, at 23:44, Liam Clarke wrote:
> > > And the answer is to change
> > > .windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
> > > and specify the grace period:
> > >
> > >
> > windowedBy(TimeWindows.of(Duration.ofMillis(5000)).grace(Duration.ofMillis(100)))
> > >
> > > On Wed, Apr 15, 2020 at 4:34 PM Liam Clarke <li...@adscale.co.nz>
> > > wrote:
> > >
> > > > Okay, doing some debugging it looks like I'm seeing this behaviour
> > because
> > > > it's picking up a grace duration of 86,395,000 ms in
> > > > KTableImpl.buildSuppress, which would happen to be  5000 millis (my
> > window
> > > > size) off 24 hours, so I've got some clues!
> > > >
> > > > On Wed, Apr 15, 2020 at 3:43 PM Liam Clarke <liam.clarke@adscale.co.nz
> > >
> > > > wrote:
> > > >
> > > >> Hi all,
> > > >>
> > > >> I have a case where I want to consume from a topic, count the number
> > of
> > > >> certain ids in a given time period X, and emit a new record to a
> > different
> > > >> topic after that same time period X has elapsed containing the
> > aggregated
> > > >> value.
> > > >>
> > > >> I'm using suppress with Suppressed.untilWindowCloses, but nothing is
> > ever
> > > >> emitted, nor is my peek placed after the suppress ever being hit.
> > > >> My code is in the below Gist - I've hardcoded the durations for 5
> > seconds
> > > >> after testing purposes:
> > > >> https://gist.github.com/LiamClarkeNZ/24121ccf0f09e4530749cbd92633fa46
> > > >>
> > > >> I'm assuming I've misunderstood something drastically, and would
> > greatly
> > > >> appreciate a pointer on where I may have gone wrong. I'm wondering if
> > I
> > > >> need a larger retention on the persistent store?
> > > >>
> > > >> I understand that events have to arrive in order for windows to
> > close, so
> > > >> I've sent events after the window has expired to attempt to move the
> > window
> > > >> on, and my first peek (before the suppression) is emitting as I do:
> > > >>
> > > >> 1. 2020-04-15T03:36:48.569Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> > > >> 1. 2020-04-15T03:37:11.682Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> > > >> 1. 2020-04-15T03:39:18.882Z aqgzftnvyn - 1
> > > >>
> > > >>
> > > >>  Any guidance greatfully appreciated.
> > > >>
> > > >> Kind regards,
> > > >>
> > > >> Liam Clarke
> > > >>
> > > >
> > >
> >
>

Re: Kafka Streams - issues with windowing and suppress

Posted by Liam Clarke <li...@adscale.co.nz>.
Hi John,

I can't really think of a way to make it more obvious without breaking
backwards compatibility - e.g., obvious easy fix is that grace period is a
mandatory arg to TimeWindows, but that would definitely break compatibility.

Cheers,

Liam Clarke-Hutchinson

On Thu, Apr 16, 2020 at 1:59 AM John Roesler <vv...@apache.org> wrote:

> Boom, you got it, Liam! Nice debugging work.
>
> This is a pretty big bummer, but I had to do it that way for
> compatibility. I added a log message to try and help reduce the risk, but
> it’s still kind of a trap.
>
> I’d like to do a KIP at some point to consider changing the default grace
> period, but haven’t done it because it’s not clear what the default should
> be.
>
> Please let me know if you have any ideas!
> Thanks,
> -John
>
>
> On Tue, Apr 14, 2020, at 23:44, Liam Clarke wrote:
> > And the answer is to change
> > .windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
> > and specify the grace period:
> >
> >
> windowedBy(TimeWindows.of(Duration.ofMillis(5000)).grace(Duration.ofMillis(100)))
> >
> > On Wed, Apr 15, 2020 at 4:34 PM Liam Clarke <li...@adscale.co.nz>
> > wrote:
> >
> > > Okay, doing some debugging it looks like I'm seeing this behaviour
> because
> > > it's picking up a grace duration of 86,395,000 ms in
> > > KTableImpl.buildSuppress, which would happen to be  5000 millis (my
> window
> > > size) off 24 hours, so I've got some clues!
> > >
> > > On Wed, Apr 15, 2020 at 3:43 PM Liam Clarke <liam.clarke@adscale.co.nz
> >
> > > wrote:
> > >
> > >> Hi all,
> > >>
> > >> I have a case where I want to consume from a topic, count the number
> of
> > >> certain ids in a given time period X, and emit a new record to a
> different
> > >> topic after that same time period X has elapsed containing the
> aggregated
> > >> value.
> > >>
> > >> I'm using suppress with Suppressed.untilWindowCloses, but nothing is
> ever
> > >> emitted, nor is my peek placed after the suppress ever being hit.
> > >> My code is in the below Gist - I've hardcoded the durations for 5
> seconds
> > >> after testing purposes:
> > >> https://gist.github.com/LiamClarkeNZ/24121ccf0f09e4530749cbd92633fa46
> > >>
> > >> I'm assuming I've misunderstood something drastically, and would
> greatly
> > >> appreciate a pointer on where I may have gone wrong. I'm wondering if
> I
> > >> need a larger retention on the persistent store?
> > >>
> > >> I understand that events have to arrive in order for windows to
> close, so
> > >> I've sent events after the window has expired to attempt to move the
> window
> > >> on, and my first peek (before the suppression) is emitting as I do:
> > >>
> > >> 1. 2020-04-15T03:36:48.569Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> > >> 1. 2020-04-15T03:37:11.682Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> > >> 1. 2020-04-15T03:39:18.882Z aqgzftnvyn - 1
> > >>
> > >>
> > >>  Any guidance greatfully appreciated.
> > >>
> > >> Kind regards,
> > >>
> > >> Liam Clarke
> > >>
> > >
> >
>

Re: Kafka Streams - issues with windowing and suppress

Posted by John Roesler <vv...@apache.org>.
Boom, you got it, Liam! Nice debugging work. 

This is a pretty big bummer, but I had to do it that way for compatibility. I added a log message to try and help reduce the risk, but it’s still kind of a trap. 

I’d like to do a KIP at some point to consider changing the default grace period, but haven’t done it because it’s not clear what the default should be. 

Please let me know if you have any ideas!
Thanks,
-John


On Tue, Apr 14, 2020, at 23:44, Liam Clarke wrote:
> And the answer is to change
> .windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
> and specify the grace period:
>  
> windowedBy(TimeWindows.of(Duration.ofMillis(5000)).grace(Duration.ofMillis(100)))
> 
> On Wed, Apr 15, 2020 at 4:34 PM Liam Clarke <li...@adscale.co.nz>
> wrote:
> 
> > Okay, doing some debugging it looks like I'm seeing this behaviour because
> > it's picking up a grace duration of 86,395,000 ms in
> > KTableImpl.buildSuppress, which would happen to be  5000 millis (my window
> > size) off 24 hours, so I've got some clues!
> >
> > On Wed, Apr 15, 2020 at 3:43 PM Liam Clarke <li...@adscale.co.nz>
> > wrote:
> >
> >> Hi all,
> >>
> >> I have a case where I want to consume from a topic, count the number of
> >> certain ids in a given time period X, and emit a new record to a different
> >> topic after that same time period X has elapsed containing the aggregated
> >> value.
> >>
> >> I'm using suppress with Suppressed.untilWindowCloses, but nothing is ever
> >> emitted, nor is my peek placed after the suppress ever being hit.
> >> My code is in the below Gist - I've hardcoded the durations for 5 seconds
> >> after testing purposes:
> >> https://gist.github.com/LiamClarkeNZ/24121ccf0f09e4530749cbd92633fa46
> >>
> >> I'm assuming I've misunderstood something drastically, and would greatly
> >> appreciate a pointer on where I may have gone wrong. I'm wondering if I
> >> need a larger retention on the persistent store?
> >>
> >> I understand that events have to arrive in order for windows to close, so
> >> I've sent events after the window has expired to attempt to move the window
> >> on, and my first peek (before the suppression) is emitting as I do:
> >>
> >> 1. 2020-04-15T03:36:48.569Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> >> 1. 2020-04-15T03:37:11.682Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> >> 1. 2020-04-15T03:39:18.882Z aqgzftnvyn - 1
> >>
> >>
> >>  Any guidance greatfully appreciated.
> >>
> >> Kind regards,
> >>
> >> Liam Clarke
> >>
> >
>

Re: Kafka Streams - issues with windowing and suppress

Posted by Liam Clarke <li...@adscale.co.nz>.
And the answer is to change
.windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
and specify the grace period:
 windowedBy(TimeWindows.of(Duration.ofMillis(5000)).grace(Duration.ofMillis(100)))

On Wed, Apr 15, 2020 at 4:34 PM Liam Clarke <li...@adscale.co.nz>
wrote:

> Okay, doing some debugging it looks like I'm seeing this behaviour because
> it's picking up a grace duration of 86,395,000 ms in
> KTableImpl.buildSuppress, which would happen to be  5000 millis (my window
> size) off 24 hours, so I've got some clues!
>
> On Wed, Apr 15, 2020 at 3:43 PM Liam Clarke <li...@adscale.co.nz>
> wrote:
>
>> Hi all,
>>
>> I have a case where I want to consume from a topic, count the number of
>> certain ids in a given time period X, and emit a new record to a different
>> topic after that same time period X has elapsed containing the aggregated
>> value.
>>
>> I'm using suppress with Suppressed.untilWindowCloses, but nothing is ever
>> emitted, nor is my peek placed after the suppress ever being hit.
>> My code is in the below Gist - I've hardcoded the durations for 5 seconds
>> after testing purposes:
>> https://gist.github.com/LiamClarkeNZ/24121ccf0f09e4530749cbd92633fa46
>>
>> I'm assuming I've misunderstood something drastically, and would greatly
>> appreciate a pointer on where I may have gone wrong. I'm wondering if I
>> need a larger retention on the persistent store?
>>
>> I understand that events have to arrive in order for windows to close, so
>> I've sent events after the window has expired to attempt to move the window
>> on, and my first peek (before the suppression) is emitting as I do:
>>
>> 1. 2020-04-15T03:36:48.569Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
>> 1. 2020-04-15T03:37:11.682Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
>> 1. 2020-04-15T03:39:18.882Z aqgzftnvyn - 1
>>
>>
>>  Any guidance greatfully appreciated.
>>
>> Kind regards,
>>
>> Liam Clarke
>>
>

Re: Kafka Streams - issues with windowing and suppress

Posted by Liam Clarke <li...@adscale.co.nz>.
Okay, doing some debugging it looks like I'm seeing this behaviour because
it's picking up a grace duration of 86,395,000 ms in
KTableImpl.buildSuppress, which would happen to be  5000 millis (my window
size) off 24 hours, so I've got some clues!

On Wed, Apr 15, 2020 at 3:43 PM Liam Clarke <li...@adscale.co.nz>
wrote:

> Hi all,
>
> I have a case where I want to consume from a topic, count the number of
> certain ids in a given time period X, and emit a new record to a different
> topic after that same time period X has elapsed containing the aggregated
> value.
>
> I'm using suppress with Suppressed.untilWindowCloses, but nothing is ever
> emitted, nor is my peek placed after the suppress ever being hit.
> My code is in the below Gist - I've hardcoded the durations for 5 seconds
> after testing purposes:
> https://gist.github.com/LiamClarkeNZ/24121ccf0f09e4530749cbd92633fa46
>
> I'm assuming I've misunderstood something drastically, and would greatly
> appreciate a pointer on where I may have gone wrong. I'm wondering if I
> need a larger retention on the persistent store?
>
> I understand that events have to arrive in order for windows to close, so
> I've sent events after the window has expired to attempt to move the window
> on, and my first peek (before the suppression) is emitting as I do:
>
> 1. 2020-04-15T03:36:48.569Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> 1. 2020-04-15T03:37:11.682Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> 1. 2020-04-15T03:39:18.882Z aqgzftnvyn - 1
>
>
>  Any guidance greatfully appreciated.
>
> Kind regards,
>
> Liam Clarke
>