You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by John Roesler <jo...@confluent.io> on 2018/07/02 15:23:20 UTC

Re: [DISCUSS] KIP-328: Ability to suppress updates for KTables

Hi Guozhang,

Thanks for the clarification.

To answer your questions:
1. Yes, specifically Y < X makes sense and is by design.

The scenario is to support IQ queries over windows that are closed but not
evicted. For example, suppose we have a metrics application backed by
Streams. Let's say we do time windows of 1 minute with retention until 30
days, and we compute the final event after an additional 5 minutes.

When you load the app, it builds the graphs from the IQ stores and then
proceeds to live update the page using final updates + foreach. If we
didn't support Y < X, this use case wouldn't work at all.


As far as X < Y, goes, I think that we should set Y = max(Y, X -
windowSize). This is semantically harmless, and gives the best
responsiveness, since that's the earliest time we know the window result is
"final".


Altogether, it seems like you think that people mostly set the window
retention time tightly to when they would want to close the window and emit
a final event, whereas I think that people would set retention time much
greater than than the latest they expect to see an event when they are
using IQ. You have much more experience with this domain than I do, though,
so I feel I'll have to defer to your instincts.



2. I believe so. In my response to Matthias, I gave the example of making
Streams ignore out-of-order events with "suppressLateEvents(Duration.ZERO)".
This was actually something I wished for in the past. More generally, it
seems plausible to me that bounded lateness would be a useful invariant for
some kinds of custom processors or foreach blocks.



I've also been thinking this weekend about the concerns you raised, and I
have some more thoughts. I'll send a separate reply to keep the messages
short.

Thanks for helping to hash this out,
-john


On Sun, Jul 1, 2018 at 11:24 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hi John,
>
> Regarding the metrics: yeah I think I'm with you that the dropped records
> due to window retention or emit suppression policies should be recorded
> differently, and using this KIP's proposed metric would be fine. If you
> also think we can use this KIP's proposed metrics to cover the window
> retention cased skipping records, then we can include the changes in this
> KIP as well.
>
> Regarding the current proposal, I'm actually not too worried about the
> inconsistency between query semantics and downstream emit semantics. For
> queries, we will always return the current running results of the windows,
> being it partial or final results depending on the window retention time
> anyways, which has nothing to do whether the emitted stream should be one
> final output per key or not. I also agree that having a unified operation
> is generally better for users to focus on leveraging that one only than
> learning about two set of operations. The only question I had is, for final
> updates of window stores, if it is a bit awkward to understand the
> configuration combo. Thinking about this more, I think my root worry in the
> "suppressLateEvents" call for windowed tables, since from a user
> perspective: if my retention time is X which means "pay the cost to allow
> late records up to X to still be applied updating the tables", why would I
> ever want to suppressLateEvents by Y ( < X), to say "do not send the
> updates up to Y, which means the downstream operator or sink topic for this
> stream would actually see a truncated update stream while I've paid larger
> cost for that"; and of course, Y > X would not make sense either as you
> would not see any updates later than X anyways. So in all, my feeling is
> that it makes less sense for windowed table's "suppressLateEvents" with a
> parameter that is not equal to the window retention, and opening the door
> in the current proposal may confuse people with that.
>
> Again, above is just a subjective opinion and probably we can also bring up
> some scenarios that users does want to set X != Y.. but personally I feel
> that even if the semantics for this scenario if intuitive for user to
> understand, doe that really make sense and should we really open the door
> for it. So I think maybe separating the final update in a separate API's
> benefits may overwhelm the advantage of having one uniform definition. And
> for my alternative proposal, the rationale was from both my concern about
> "suppressLateEvents" for windowed store, and Matthias' question about
> "suppressLateEvents" for non-windowed stores, that if it is less meaningful
> for both, we can consider removing it completely and only do
> "IntermediateSuppression" in Suppress instead.
>
> So I'd summarize my thoughts in the following questions:
>
> 1. Does "suppressLateEvents" with parameter Y != X (window retention time)
> for windowed stores make sense in practice?
> 2. Does "suppressLateEvents" with any parameter Y for non-windowed stores
> make sense in practice?
>
>
>
> Guozhang
>
>
> On Fri, Jun 29, 2018 at 2:26 PM, Bill Bejeck <bb...@gmail.com> wrote:
>
> > Thanks for the explanation, that does make sense.  I have some questions
> on
> > operations, but I'll just wait for the PR and tests.
> >
> > Thanks,
> > Bill
> >
> > On Wed, Jun 27, 2018 at 8:14 PM John Roesler <jo...@confluent.io> wrote:
> >
> > > Hi Bill,
> > >
> > > Thanks for the review!
> > >
> > > Your question is very much applicable to the KIP and not at all an
> > > implementation detail. Thanks for bringing it up.
> > >
> > > I'm proposing not to change the existing caches and configurations at
> all
> > > (for now).
> > >
> > > Imagine you have a topology like this:
> > > commit.interval.ms = 100
> > >
> > > (ktable1 (cached)) -> (suppress emitAfter 200)
> > >
> > > The first ktable (ktable1) will respect the commit interval and buffer
> > > events for 100ms before logging, storing, or forwarding them (IIRC).
> > > Therefore, the second ktable (suppress) will only see the events at a
> > rate
> > > of once per 100ms. It will apply its own buffering, and emit once per
> > 200ms
> > > This case is pretty trivial because the suppress time is a multiple of
> > the
> > > commit interval.
> > >
> > > When it's not an integer multiple, you'll get behavior like in this
> > marble
> > > diagram:
> > >
> > >
> > > <-(k:1)--(k:2)--(k:3)--(k:4)--(k:5)--(k:6)->
> > >
> > > [ KTable caching with commit interval = 2 ]
> > >
> > > <--------(k:2)---------(k:4)---------(k:6)->
> > >
> > >       [ suppress with emitAfter = 3 ]
> > >
> > > <---------------(k:2)----------------(k:6)->
> > >
> > >
> > > If this behavior isn't desired (for example, if you wanted to emit
> (k:3)
> > at
> > > time 3, I'd recommend setting the "cache.max.bytes.buffering" to 0 or
> > > modifying the topology to disable caching. Then, the behavior is more
> > > simply determined just by the suppress operator.
> > >
> > > Does that seem right to you?
> > >
> > >
> > > Regarding the changelogs, because the suppression operator hangs onto
> > > events for a while, it will need its own changelog. The changelog
> > > should represent the current state of the buffer at all times. So when
> > the
> > > suppress operator sees (k:2), for example, it will log (k:2). When it
> > > later gets to time 3, it's time to emit (k:2) downstream. Because k is
> no
> > > longer buffered, the suppress operator will log (k:null). Thus, when
> > > recovering,
> > > it can rebuild the buffer by reading its changelog.
> > >
> > > What do you think about this?
> > >
> > > Thanks,
> > > -John
> > >
> > >
> > >
> > > On Wed, Jun 27, 2018 at 4:16 PM Bill Bejeck <bb...@gmail.com> wrote:
> > >
> > > > Hi John,  thanks for the KIP.
> > > >
> > > > Early on in the KIP, you mention the current approaches for
> controlling
> > > the
> > > > rate of downstream records from a KTable, cache size configuration
> and
> > > > commit time.
> > > >
> > > > Will these configuration parameters still be in effect for tables
> that
> > > > don't use suppression?  For tables taking advantage of suppression,
> > will
> > > > these configurations have no impact?
> > > > This last question may be to implementation specific but if the
> > requested
> > > > suppression time is longer than the specified commit time, will the
> > > latest
> > > > record in the suppression buffer get stored in a changelog?
> > > >
> > > > Thanks,
> > > > Bill
> > > >
> > > > On Wed, Jun 27, 2018 at 3:04 PM John Roesler <jo...@confluent.io>
> > wrote:
> > > >
> > > > > Thanks for the feedback, Matthias,
> > > > >
> > > > > It seems like in straightforward relational processing cases, it
> > would
> > > > not
> > > > > make sense to bound the lateness of KTables. In general, it seems
> > > better
> > > > to
> > > > > have "guard rails" in place that make it easier to write sensible
> > > > programs
> > > > > than insensible ones.
> > > > >
> > > > > But I'm still going to argue in favor of keeping it for all KTables
> > ;)
> > > > >
> > > > > 1. I believe it is simpler to understand the operator if it has one
> > > > uniform
> > > > > definition, regardless of context. It's well defined and intuitive
> > what
> > > > > will happen when you use late-event suppression on a KTable, so I
> > think
> > > > > nothing surprising or dangerous will happen in that case. From my
> > > > > perspective, having two sets of allowed operations is actually an
> > > > increase
> > > > > in cognitive complexity.
> > > > >
> > > > > 2. To me, it's not crazy to use the operator this way. For example,
> > in
> > > > lieu
> > > > > of full-featured timestamp semantics, I can implement MVCC behavior
> > > when
> > > > > building a KTable by "suppressLateEvents(Duration.ZERO)". I suspect
> > > that
> > > > > there are other, non-obvious applications of suppressing late
> events
> > on
> > > > > KTables.
> > > > >
> > > > > 3. Not to get too much into implementation details in a KIP
> > discussion,
> > > > but
> > > > > if we did want to make late-event suppression available only on
> > > windowed
> > > > > KTables, we have two enforcement options:
> > > > >   a. check when we build the topology - this would be simple to
> > > > implement,
> > > > > but would be a runtime check. Hopefully, people write tests for
> their
> > > > > topology before deploying them, so the feedback loop isn't
> > > instantaneous,
> > > > > but it's not too long either.
> > > > >   b. add a new WindowedKTable type - this would be a compile time
> > > check,
> > > > > but would also be substantial increase of both interface and code
> > > > > complexity.
> > > > >
> > > > > We should definitely strive to have guard rails protecting against
> > > > > surprising or dangerous behavior. Protecting against programs that
> we
> > > > don't
> > > > > currently predict is a lesser benefit, and I think we can put up
> > guard
> > > > > rails on a case-by-case basis for that. It seems like the increase
> in
> > > > > cognitive (and potentially code and interface) complexity makes me
> > > think
> > > > we
> > > > > should skip this case.
> > > > >
> > > > > What do you think?
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > > > On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax <
> > > matthias@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Thanks for the KIP John.
> > > > > >
> > > > > > One initial comments about the last example "Bounded lateness":
> > For a
> > > > > > non-windowed KTable bounding the lateness does not really make
> > sense,
> > > > > > does it?
> > > > > >
> > > > > > Thus, I am wondering if we should allow `suppressLateEvents()`
> for
> > > this
> > > > > > case? It seems to be better to only allow it for
> windowed-KTables.
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > >
> > > > > > On 6/27/18 8:53 AM, Ted Yu wrote:
> > > > > > > I noticed this (lack of primary parameter) as well.
> > > > > > >
> > > > > > > What you gave as new example is semantically the same as what I
> > > > > > suggested.
> > > > > > > So it is good by me.
> > > > > > >
> > > > > > > Thanks
> > > > > > >
> > > > > > > On Wed, Jun 27, 2018 at 7:31 AM, John Roesler <
> john@confluent.io
> > >
> > > > > wrote:
> > > > > > >
> > > > > > >> Thanks for taking look, Ted,
> > > > > > >>
> > > > > > >> I agree this is a departure from the conventions of Streams
> DSL.
> > > > > > >>
> > > > > > >> Most of our config objects have one or two "required"
> > parameters,
> > > > > which
> > > > > > fit
> > > > > > >> naturally with the static factory method approach. TimeWindow,
> > for
> > > > > > example,
> > > > > > >> requires a size parameter, so we can naturally say
> > > > > TimeWindows.of(size).
> > > > > > >>
> > > > > > >> I think in the case of a suppression, there's really no "core"
> > > > > > parameter,
> > > > > > >> and "Suppression.of()" seems sillier than "new
> Suppression()". I
> > > > think
> > > > > > that
> > > > > > >> Suppression.of(duration) would be ambiguous, since there are
> > many
> > > > > > durations
> > > > > > >> that we can configure.
> > > > > > >>
> > > > > > >> However, thinking about it again, I suppose that I can give
> each
> > > > > > >> configuration method a static version, which would let you
> > replace
> > > > > "new
> > > > > > >> Suppression()." with "Suppression." in all the examples.
> > > Basically,
> > > > > > instead
> > > > > > >> of "of()", we'd support any of the methods I listed.
> > > > > > >>
> > > > > > >> For example:
> > > > > > >>
> > > > > > >> windowCounts
> > > > > > >>     .suppress(
> > > > > > >>         Suppression
> > > > > > >>             .suppressLateEvents(Duration.ofMinutes(10))
> > > > > > >>             .suppressIntermediateEvents(
> > > > > > >>
> > > > > >  IntermediateSuppression.emitAfter(Duration.ofMinutes(10))
> > > > > > >>             )
> > > > > > >>     );
> > > > > > >>
> > > > > > >>
> > > > > > >> Does that seem better?
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >> -John
> > > > > > >>
> > > > > > >>
> > > > > > >> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu <yu...@gmail.com>
> > > > wrote:
> > > > > > >>
> > > > > > >>> I started to read this KIP which contains a lot of materials.
> > > > > > >>>
> > > > > > >>> One suggestion:
> > > > > > >>>
> > > > > > >>>     .suppress(
> > > > > > >>>         new Suppression()
> > > > > > >>>
> > > > > > >>>
> > > > > > >>> Do you think it would be more consistent with the rest of
> > Streams
> > > > > data
> > > > > > >>> structures by supporting `of` ?
> > > > > > >>>
> > > > > > >>> Suppression.of(Duration.ofMinutes(10))
> > > > > > >>>
> > > > > > >>>
> > > > > > >>> Cheers
> > > > > > >>>
> > > > > > >>>
> > > > > > >>>
> > > > > > >>> On Tue, Jun 26, 2018 at 1:11 PM, John Roesler <
> > john@confluent.io
> > > >
> > > > > > wrote:
> > > > > > >>>
> > > > > > >>>> Hello devs and users,
> > > > > > >>>>
> > > > > > >>>> Please take some time to consider this proposal for Kafka
> > > Streams:
> > > > > > >>>>
> > > > > > >>>> KIP-328: Ability to suppress updates for KTables
> > > > > > >>>>
> > > > > > >>>> link: https://cwiki.apache.org/confluence/x/sQU0BQ
> > > > > > >>>>
> > > > > > >>>> The basic idea is to provide:
> > > > > > >>>> * more usable control over update rate (vs the current state
> > > store
> > > > > > >>> caches)
> > > > > > >>>> * the final-result-for-windowed-computations feature which
> > > several
> > > > > > >> people
> > > > > > >>>> have requested
> > > > > > >>>>
> > > > > > >>>> I look forward to your feedback!
> > > > > > >>>>
> > > > > > >>>> Thanks,
> > > > > > >>>> -John
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>