You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Leah Thomas <lt...@confluent.io> on 2020/08/20 21:21:10 UTC

[DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

Hi all,

I'd like to start a discussion for KIP-659:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size


The goal of the KIP is to ensure that window size is passed to the consumer
when needed, which will generally be for testing purposes, and to avoid
runtime errors when the *TimeWindowedSerde* is created without a window
size.

Looking forward to hearing your feedback.

Cheers,
Leah

Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

Posted by John Roesler <vv...@apache.org>.
Thanks, Leah,

This sounds good!

-John

On Wed, Sep 2, 2020, at 19:23, Matthias J. Sax wrote:
> Thanks for the KIP and the detailed discussion. I guess this all makes
> sense.
> 
> -Matthias
> 
> On 9/2/20 1:28 PM, Leah Thomas wrote:
> > Hey John,
> > 
> > I see what you say about the console consumer in particular. I don't think
> > that adding the extra config would *hurt* at all, so I'm good with keeping
> > that in the KIP. I re-updated the KIP proposal to include the configs.
> > 
> > The serde resolution sounds good to me as well, I added a few lines in the
> > KIP about logging an error when the *timeWindowedSerde *implicit is called.
> > 
> > Let me know if there are any other concerns, else I'll resume voting.
> > 
> > Cheers,
> > Leah
> > 
> > On Tue, Sep 1, 2020 at 11:17 AM John Roesler <vv...@apache.org> wrote:
> > 
> >> Hi Leah and Sophie,
> >>
> >> Sorry for the delayed response.
> >>
> >> You can pass in pre-instantiated (and therefore arbirarily
> >> constructed) deserializers to the KafkaConsumer. However,
> >> this doesn't mean we should drop the configs. The same
> >> argument for dropping the configs implies that the consumer
> >> shouldn't have configs for setting the deserializers at all.
> >> This doesn't sound right, and I'm asking myself why. The
> >> most likely answer seems to me to be that you sometimes
> >> create a Consumer without invoking the Java constructor at
> >> all. For example, when you use the console-consumer. In that
> >> case, it would be indispensible to be able to fully
> >> configure the deserializers via a properties file.
> >>
> >> Therefore, I think we should go ahead and propose the new
> >> config. (Sorry for the flip-flop, Leah)
> >>
> >> Regarding the implicits, Leah's conclusion sounds good to
> >> me. Yuriy is not adding any implicit for this serde to the
> >> new class, and we'll just add an ERROR log to the existing
> >> implicit. Once KIP-616 is merged, the existing implicit will
> >> be deprecated along with all the other implicits in that
> >> class, so there will be two "forces" pushing people to the
> >> new interface, where they will discover the lack of an
> >> implicit, which then forces them to call the non-deprecated
> >> constructors directly.
> >>
> >> To answer Sophie's question, "implicit" is a feature of
> >> Scala that allows the type system to automatically resolve
> >> method arguments when there is just one possible argument in
> >> scope. There's a bunch of docs for it, so I won't waste a
> >> ton of e-ink on the details; the docs will be crystal clear
> >> just assuming you know all about monads and monoids and
> >> type-level programming ;)
> >>
> >> The punch line for us is that we provide implicits for the
> >> basic serdes, and also for turning pairs of
> >> serializers/deserializers into serdes, so you can avoid
> >> explicitly passing any serdes into Streams DSL operations,
> >> but also not have to fall back on the default key/value
> >> serde configs. Instead, the type system will plug in the
> >> right serde for the K/V types at each operation.
> >>
> >> We would _not_ add an implicit for a serde that we can't
> >> construct in a context-free way using just type information,
> >> as in this case. That's why Yuriy dropped the new implicit
> >> and why we're going to add an error to the existing
> >> implicit. On the other hand, removing the existing implicit
> >> will cause compiler errors when the type system is no longer
> >> able to find a suitable argument for an implicit parameter,
> >> so we don't want to just remove the existing implicit.
> >>
> >> Thanks,
> >> -John
> >>
> >> On Mon, 2020-08-31 at 16:28 -0500, Leah Thomas wrote:
> >>> Hey Sophie,
> >>>
> >>> Thanks for the catch! It makes sense that the consumer would accept a
> >>> deserializer somewhere, so we can definitely skip the additional
> >> configs. I
> >>> updated the KIP to reflect that.
> >>>
> >>> John seems to know Scala better than I do as well, but I think we need to
> >>> keep the current implicit that allows users to just pass in a serde and
> >> no
> >>> window size for backwards compatibility. It seems to me that based on the
> >>> discussion around KIP-616 <https://github.com/apache/kafka/pull/8955>;,
> >> we
> >>> can pretty easily do John's third suggestion for handling this implicit:
> >>> logging an error message and passing to a non-deprecated constructor
> >> using
> >>> some default value. It seems from KIP-616 that most scala users will use
> >>> the new Serdes class anyways, and Yuriy is just removing these implicits
> >> so
> >>> it seems like whatever fix we decide for this class won't get used too
> >>> heavily.
> >>>
> >>> Cheers,
> >>> Leah
> >>>
> >>> On Thu, Aug 27, 2020 at 8:49 PM Sophie Blee-Goldman <sophie@confluent.io
> >>>
> >>> wrote:
> >>>
> >>>> Ok I'm definitely feeling pretty dumb now, but I was just thinking how
> >>>> ridiculous
> >>>> it is that the Consumer forces you to configure your Deserializer
> >> through
> >>>> actual
> >>>> config maps instead of just taking the ones you pass in directly. So I
> >>>> thought
> >>>> "why not just fix the Consumer to allow passing in an actual
> >> Deserializer
> >>>> object"
> >>>> and went to go through the code in case there's some legitimate reason
> >> why
> >>>> not,
> >>>> and what do you know. You actually can pass in an actual Deserializer
> >>>> object!
> >>>> There is a KafkaConsumer constructor that accepts a key and value
> >>>> Deserializer,
> >>>> and doesn't instantiate or configure a new one if provided in this way.
> >>>> Duh.
> >>>>
> >>>> Sorry for misleading everyone on that front. I'm just happy to find out
> >>>> that a
> >>>> reasonable way of configuring deserializer actually *is *possible after
> >>>> all. In that
> >>>> case, maybe we can remove the extra configs from this KIP and just
> >> proceed
> >>>> with the deprecation?
> >>>>
> >>>> Obviously that doesn't help anything with regards to the remaining
> >> question
> >>>> that
> >>>> John/Leah have posed. Now I probably don't have anything valuable to
> >> offer
> >>>> there
> >>>> since I know next to nothing about Scala, but I do want to
> >>>> better understand: why
> >>>> would we add an "implicit" (what exactly does this mean?) that relies
> >> on
> >>>> allowing
> >>>> users to not set the windowSize, if we are explicitly taking away that
> >>>> option from
> >>>> the Java users? Or if we have already added something, can't we just
> >>>> deprecate
> >>>> it like we are deprecating the Java constructor? I may need some
> >> remedial
> >>>> lessons
> >>>> in Scala just to understand the problem that we apparently have,
> >> because I
> >>>> don't
> >>>> get it.
> >>>>
> >>>> By the way, I'm a little tempted to say that we should go one step
> >> further
> >>>> and
> >>>> deprecate the DEFAULT_WINDOWED_INNER_CLASS configs, but maybe that's
> >>>> a bit too radical for the moment. It just seems like default serde
> >> configs
> >>>> have been
> >>>> a lot more trouble than they're worth overall. That said, these
> >> particular
> >>>> configs
> >>>> don't appear to have hurt anyone thus far, at least not that we know of
> >>>> (possibly
> >>>> because no one is using it anyway) so there's no strong motivation to
> >> do so
> >>>>
> >>>> On Wed, Aug 26, 2020 at 9:19 AM Leah Thomas <lt...@confluent.io>
> >> wrote:
> >>>>
> >>>>> Hey John,
> >>>>>
> >>>>> Thanks for pointing this out, I wasn't sure how to handle the Scala
> >>>>> changes.
> >>>>>
> >>>>> I'm not fully versed in the Scala version of Streams, so feel free to
> >>>>> correct me if any of my assumptions are wrong. I think logging an
> >> error
> >>>>> message and then calling the constructor that requires a windowSize
> >> seems
> >>>>> like the simplest fix from my point of view. So instead of
> >>>>> calling`TimeWindowedSerde(final Serde<T> inner)`, we could
> >>>>> call `TimeWindowedSerde(final Serde<T> inner, final long windowSize)`
> >>>> with
> >>>>> Long.MAX_VALUE as the window size.
> >>>>>
> >>>>> I do feel like we would want to add an implicit to `Serdes.scala`
> >> that
> >>>>> takes a serde and a window size so that users can access the
> >> constructor
> >>>>> that initializes with the correct window size. I agree with your
> >> comment
> >>>> on
> >>>>> the KIP-616 PR that the serde needs to be pre-configured when it's
> >>>> passed,
> >>>>> but I'm not sure we would need a windowSize config. I think if the
> >>>>> constructor is passed the serde and the window size, then window size
> >>>>> should be set within the deserializer. The only catch is if the Scala
> >>>>> version of the consumer creates a new deserializer, and at that point
> >>>> we'd
> >>>>> need a window size config, but I'm not sure if that's the case.
> >>>>>
> >>>>> WDYT - is it possible to alter the existing implicit and add a new
> >> one?
> >>>>>
> >>>>> On Wed, Aug 26, 2020 at 10:00 AM John Roesler <vv...@apache.org>
> >>>> wrote:
> >>>>>> Hi Leah,
> >>>>>>
> >>>>>> I was just reviewing the PR for KIP-616 and realized that we
> >>>>>> forgot to mention the Scala API in your KIP. We should
> >>>>>> consider it because `scala.Serdes.timeWindowedSerde` is
> >>>>>> implicitly using the exact constructor you're deprecating.
> >>>>>>
> >>>>>> I had some ideas in the code review:
> >>>>>> https://github.com/apache/kafka/pull/8955#discussion_r477358755
> >>>>>>
> >>>>>> What do you think is the best approach?
> >>>>>>
> >>>>>> Concretely, I think Yuriy can make the call for KIP-616 (for
> >>>>>> the new implicit that he's adding). But I think your KIP-659
> >>>>>> should mention how we modify the existing implicit.
> >>>>>>
> >>>>>> Typically, we'd try to avoid throwing new exceptions or
> >>>>>> causing compile errors, so
> >>>>>> * dropping the implicit is probably off the table (compile
> >>>>>> error).
> >>>>>> * throwing an exception in the deserializer may not be ok,
> >>>>>> althought it might still actually be ok since it's adding a
> >>>>>> corruption check.
> >>>>>> * logging an ERROR message and then passing through to the
> >>>>>> underlying deserializer would be more conservative.
> >>>>>>
> >>>>>> What do you think we should do?
> >>>>>>
> >>>>>> Thanks,
> >>>>>> -John
> >>>>>>
> >>>>>> On Fri, 2020-08-21 at 16:05 -0500, Leah Thomas wrote:
> >>>>>>> Thanks for the typo catch, John.
> >>>>>>>
> >>>>>>> Let me know if anyone else has thoughts or ideas.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Leah
> >>>>>>>
> >>>>>>> On Fri, Aug 21, 2020 at 2:50 PM John Roesler <
> >> vvcephei@apache.org>
> >>>>>> wrote:
> >>>>>>>> Thanks, all,
> >>>>>>>>
> >>>>>>>> Based on my reading of the conversation, it sounds like I
> >>>>>>>> have some legwork to do in KIP-645, but our collective
> >>>>>>>> instinct is that Leah's proposal doesn't need to change to
> >>>>>>>> account for whatever we might decide to do in KIP-645.
> >>>>>>>>
> >>>>>>>> I have no further concerns about KIP-645, and I think it's a
> >>>>>>>> good proposal.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> -John
> >>>>>>>>
> >>>>>>>> P.s., there's still a typo on the wiki that says
> >>>>>>>> "ConsumerConfig" on the code block, even though the text now
> >>>>>>>> says "StreamsConfig".
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Fri, 2020-08-21 at 10:56 -0700, Sophie Blee-Goldman
> >>>>>>>> wrote:
> >>>>>>>>> Just want to make a quick comment on the question that John
> >>>> raised
> >>>>>> about
> >>>>>>>>> whether we
> >>>>>>>>> should introduce a separate config for "key" and "value"
> >> window
> >>>>>> sizes:
> >>>>>>>>> My short answer is No, I don't think that's necessary. First
> >> of
> >>>>> all,
> >>>>>> as
> >>>>>>>> you
> >>>>>>>>> said, there is no
> >>>>>>>>> first-class concept of a "Windowed value" in the DSL.
> >> Second, to
> >>>>>> engage
> >>>>>>>> in
> >>>>>>>>> your rhetorical
> >>>>>>>>> question, if there's no default window size for a Streams
> >> program
> >>>>>> then
> >>>>>>>> how
> >>>>>>>>> can there be a
> >>>>>>>>> sensible default for the key AND a separate sensible default
> >> for
> >>>> a
> >>>>>> value?
> >>>>>>>>> I don't think we need to follow the existing pattern if it
> >>>> doesn't
> >>>>>> make
> >>>>>>>>> sense, and to be honest
> >>>>>>>>> I'm a bit skeptical that anyone was even using these default
> >>>>> windowed
> >>>>>>>> inner
> >>>>>>>>> classes since
> >>>>>>>>> the config wasn't even defined/documented until quite
> >> recently.
> >>>> I'd
> >>>>>>>>> actually be in favor
> >>>>>>>>> of deprecating
> >>>>> StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> >>>>>>>>> but I don't want to drag that into this discussion as well.
> >>>>>>>>>
> >>>>>>>>> My understanding is that these were meant to mirror the
> >> default
> >>>>>> key/value
> >>>>>>>>> serde configs, but
> >>>>>>>>> the real use of the DEFAULT_WINDOWED_SERDE_INNER_CLASS
> >> config is
> >>>>>> actually
> >>>>>>>>> that you
> >>>>>>>>> can at least use it to configure the inner class for a
> >> Consumer,
> >>>>> thus
> >>>>>>>>> making the TimeWindowed
> >>>>>>>>> serdes functional at a basic level. With the window size
> >> configs,
> >>>>> the
> >>>>>>>> point
> >>>>>>>>> is not really to set a
> >>>>>>>>> default but to make it actually work with a Consumer which
> >>>>>> instantiates
> >>>>>>>> the
> >>>>>>>>> deserializer by
> >>>>>>>>> reflection. So I don't think we should position this new
> >> config
> >>>> as
> >>>>> a
> >>>>>>>>> "default" (although it may
> >>>>>>>>> technically behave as one) -- within Streams users can and
> >> should
> >>>>>> always
> >>>>>>>>> supply the window
> >>>>>>>>> size through the constructor. I don't think that's such an
> >>>>>> inconvenience,
> >>>>>>>>> vs the amount of
> >>>>>>>>> confusion that will (and has) been caused by default
> >>>> serde-related
> >>>>>>>> configs
> >>>>>>>>> in streams.
> >>>>>>>>>
> >>>>>>>>> Regarding the fixed vs variable sized config, one idea I had
> >> was
> >>>> to
> >>>>>> just
> >>>>>>>>> keep the fixed-size config
> >>>>>>>>> and constructor and let users of enumerable windows override
> >> the
> >>>>>>>>> TimeWindowedSerde class(es)
> >>>>>>>>> to do whatever it is they need. IIUC you already have to
> >> override
> >>>>>> some
> >>>>>>>>> other windows-related
> >>>>>>>>> classes to get variable-sized windows so doing the same for
> >> the
> >>>>>> serdes
> >>>>>>>>> sounds reasonable to me.
> >>>>>>>>> Just my take on the "simple things should be easy, difficult
> >>>> things
> >>>>>>>> should
> >>>>>>>>> be possible" mantra
> >>>>>>>>>
> >>>>>>>>> One last quick side note: the reason we don't really need to
> >>>>> discuss
> >>>>>>>>> SessionWindows here
> >>>>>>>>> is that they already encode both the start and end time for
> >> the
> >>>>>> window.
> >>>>>>>>> This is probably the best
> >>>>>>>>> way to go for TimeWindows as well, but making this change in
> >> a
> >>>>>> backwards
> >>>>>>>>> compatible way is a
> >>>>>>>>> much larger scope of work. And even then, we might want to
> >>>> consider
> >>>>>>>> making
> >>>>>>>>> it possible to still
> >>>>>>>>> just encode the start time to save space, thus requiring this
> >>>>> config
> >>>>>>>> either
> >>>>>>>>> way
> >>>>>>>>>
> >>>>>>>>> On Fri, Aug 21, 2020 at 9:26 AM Leah Thomas <
> >>>> lthomas@confluent.io>
> >>>>>>>> wrote:
> >>>>>>>>>> Thanks John and Walker for your thoughts.
> >>>>>>>>>>
> >>>>>>>>>> I agree with your two scenarios John, that you configure
> >> fully
> >>>> in
> >>>>>> the
> >>>>>>>>>> constructor, or you don't need to call `init()`. IIUC, if
> >> we
> >>>> pass
> >>>>>> the
> >>>>>>>>>> deserializer to the consumer, we want to make sure it has
> >> the
> >>>>>> window
> >>>>>>>> size
> >>>>>>>>>> is set using the newly required constructor. If we don't
> >> pass
> >>>> in
> >>>>>> the
> >>>>>>>>>> deserializer, the window size will be set through the
> >> configs.
> >>>> To
> >>>>>>>> answer
> >>>>>>>>>> Walker's question directly, because the configs aren't
> >> passed
> >>>> to
> >>>>>> the
> >>>>>>>>>> constructor, we can't set the window size unless we pass
> >> it to
> >>>>> the
> >>>>>>>>>> constructor or configure the constructor after
> >> initializing it.
> >>>>>>>>>>
> >>>>>>>>>> For users who would rather not set a strict window size
> >>>> (outside
> >>>>>> of the
> >>>>>>>>>> variable size scenario), they can pass in Long.MAX_VALUE.
> >> The
> >>>> way
> >>>>>> I see
> >>>>>>>>>> this is instead of having the default be for scenarios that
> >>>> don't
> >>>>>>>> require a
> >>>>>>>>>> window size, we have the default be the scenarios that
> >> *do*,
> >>>>>> flipping
> >>>>>>>> the
> >>>>>>>>>> current implementation to fit with typical use cases.
> >>>>>>>>>>
> >>>>>>>>>> On your points John:
> >>>>>>>>>> 1. I agree that it makes sense to store it in
> >> StreamsConfig,
> >>>> this
> >>>>>>>> shouldn't
> >>>>>>>>>> cause any issues. I've updated the KIP accordingly.
> >>>>>>>>>>
> >>>>>>>>>> 2. The non-fixed time windows issue is a good point. It
> >> seems
> >>>>> like
> >>>>>>>> calendar
> >>>>>>>>>> windows in particular are quite useful, so I think we want
> >> to
> >>>>> make
> >>>>>> sure
> >>>>>>>>>> that this wouldn't inhibit flexible sized windows. I think
> >>>> having
> >>>>>> two
> >>>>>>>>>> different configs and functions makes sense, although it is
> >>>>>> slightly
> >>>>>>>>>> messier. While requiring all time windows to use the
> >>>>> WindowFunction
> >>>>>>>>>> constructor would work, I think that allowing users to
> >> access
> >>>> the
> >>>>>>>>>> WindowSize constructor is preferable because it seems
> >> easier to
> >>>>>> use for
> >>>>>>>>>> people who are not at all interested in delving into
> >> variably
> >>>>> sized
> >>>>>>>>>> windows. This assumption could be wrong though, and perhaps
> >>>> users
> >>>>>> would
> >>>>>>>>>> adapt quickly to the new WindowFunction style, but my
> >> immediate
> >>>>>>>> reaction is
> >>>>>>>>>> to support both configs and constructors.
> >>>>>>>>>>
> >>>>>>>>>> One note on this is that Session Windows are handled
> >> separately
> >>>>>> from
> >>>>>>>> time
> >>>>>>>>>> windows and also have variable window sizes. I assume that
> >> the
> >>>>>>>> TimeWindowed
> >>>>>>>>>> option is preferable for variably sized windows because you
> >>>> still
> >>>>>> want
> >>>>>>>> to
> >>>>>>>>>> access the window end times? But I think one alternative
> >> could
> >>>> be
> >>>>>>>>>> separating the variably sized windows from the current
> >>>>>> implementation
> >>>>>>>> of
> >>>>>>>>>> time windows, although I think KIP-645
> >>>>>>>>>> <
> >>>>>>>>>>
> >>>>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface
> >>>>>>>>>> would make this not strictly necessary.
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Leah
> >>>>>>>>>>
> >>>>>>>>>> On Fri, Aug 21, 2020 at 10:04 AM John Roesler <
> >>>>> vvcephei@apache.org
> >>>>>>>> wrote:
> >>>>>>>>>>> Hi Leah,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the KIP! This has been a real pain for some
> >> use
> >>>>>>>>>>> cases, so it's really good to see a proposal to fix it.
> >>>>>>>>>>>
> >>>>>>>>>>> We do need a default constructor so that it can be
> >>>>>>>>>>> dynamically instantiated by the consumer (or any other
> >>>>>>>>>>> component). But I'm +1 on deprecating the constructor
> >> you're
> >>>>>>>>>>> proposing to deprecate, which only partially configures
> >> the
> >>>>>>>>>>> class. It seems like there are exactly two patterns:
> >> either
> >>>>>>>>>>> you fully configure the class in the constructor and
> >> don't
> >>>>>>>>>>> call `init()`, or you call the default constructor and
> >> then
> >>>>>>>>>>> configure the class by calling `init()`.
> >>>>>>>>>>>
> >>>>>>>>>>> I can appreciate Walker's point, but stepping back, it
> >>>>>>>>>>> doesn't actually seem that useful to partially configure
> >> the
> >>>>>>>>>>> class in the constructor and then finish up the
> >>>>>>>>>>> configuration by calling `init()`. I could see the
> >> argument
> >>>>>>>>>>> if there were a sensible default, but for this particular
> >>>>>>>>>>> class, there isn't one. Rhetorical question: what is the
> >>>>>>>>>>> default window size for Streams programs?
> >>>>>>>>>>>
> >>>>>>>>>>> I have a couple of concerns to discuss:
> >>>>>>>>>>>
> >>>>>>>>>>> 1. Config Location
> >>>>>>>>>>>
> >>>>>>>>>>> I don't think I would add the new configs to
> >> ConsumerConfig,
> >>>>>>>>>>> but would add it to StreamsConfig instead. The
> >> deserailzier
> >>>>>>>>>>> itself is in Streams (it is
> >>>>>>>>>>> o.a.k.streams.kstream.TimeWindowedDeserializer), so it
> >> seems
> >>>>>>>>>>> odd to have one of its configurations in a completely
> >>>>>>>>>>> different module.
> >>>>>>>>>>>
> >>>>>>>>>>> Also, this class already has two configs, which are in
> >>>>>>>>>>> StreamsConfig:
> >>>>>>>>>>> StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> >>>>>>>>>>> StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> >>>>>>>>>>>
> >>>>>>>>>>> It seems like the new config belongs right next to the
> >>>>>>>>>>> existing ones.
> >>>>>>>>>>>
> >>>>>>>>>>> For me, it raises a secondary question:
> >>>>>>>>>>> 1b: Should there be a KEY_WINDOW_SIZE and a
> >>>>>>>>>>> VALUE_WINDOW_SIZE? I'm honestly not sure what a "windowed
> >>>>>>>>>>> value" even is, but the fact that we can configure serdes
> >>>>>>>>>>> for it implies that perhaps we should symmetrically
> >>>>>>>>>>> configure its size as well.
> >>>>>>>>>>>
> >>>>>>>>>>> 2. Fixed Size Assumption
> >>>>>>>>>>>
> >>>>>>>>>>> In KIP-645, I'm proposing to lift the assumption that
> >>>>>>>>>>> TimeWindows have a fixed size at all, but KIP-659 is
> >>>>>>>>>>> currently built on that assumption.
> >>>>>>>>>>>
> >>>>>>>>>>> For details on why this is not a good assumtion, see:
> >>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10408
> >>>>>>>>>>>
> >>>>>>>>>>> In fact, in my POC PR for KIP-659, I'm dropping the
> >>>>>>>>>>> constructor that takes a "window size" parameter in
> >> favor of
> >>>>>>>>>>> one that takes a window function, mapping a window start
> >>>>>>>>>>> time to a full Window(start, end).
> >>>>>>>>>>>
> >>>>>>>>>>> In that context, it seems incongruous to introduce a
> >>>>>>>>>>> configuration that specifies a window size. Of course, my
> >>>>>>>>>>> KIP is also under discussion, so my proposal may not
> >>>>>>>>>>> eventually be accepted. But it is necessary to consider
> >> both
> >>>>>>>>>>> of these concerns together.
> >>>>>>>>>>>
> >>>>>>>>>>> One option seems to be to accept both. Namely, we keep
> >> the
> >>>>>>>>>>> "fixed size" constructor AND add my new constructor (for
> >>>>>>>>>>> variably sized windows). Likewise, we accept your
> >> proposal,
> >>>>>>>>>>> and KIP-659 would propose to add a new config specifying
> >> a
> >>>>>>>>>>> windowing function, such as:
> >>>>>>>>>>>
> >>>>>>>>>>>> StreamsConfig.WINDOW_FUNCTION_CONFIG
> >>>>>>>>>>>
> >>>>>>>>>>> which would be an instance of:
> >>>>>>>>>>>
> >>>>>>>>>>>> public interface WindowFunction implements
> >> Function<Long,
> >>>>>>>>>>> Window>;
> >>>>>>>>>>>
> >>>>>>>>>>> I'm not bringing these up for discussion in your KIP
> >> right
> >>>>>>>>>>> now, just demonstrating the feasibility of merging both
> >>>>>>>>>>> proposals.
> >>>>>>>>>>>
> >>>>>>>>>>> My question for you: do you think the general strategy of
> >>>>>>>>>>> having two constructors and two configs, one for fixed
> >> and
> >>>>>>>>>>> one for variable windows, makes sense? Is it too
> >>>>>>>>>>> complicated? Do you have a better idea?
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks!
> >>>>>>>>>>> -John
> >>>>>>>>>>>
> >>>>>>>>>>> On Thu, 2020-08-20 at 14:49 -0700, Walker Carlson wrote:
> >>>>>>>>>>>> Hi Leah,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Could you explain a bit more why we do not wish to
> >>>>>>>>>>>> let TimeWindowedDeserializer and WindowedSerdes be
> >> created
> >>>>>> without
> >>>>>>>> a
> >>>>>>>>>>>> specified time as a parameter?
> >>>>>>>>>>>>
> >>>>>>>>>>>> I understand the long.MAX_VALUE could cause problems
> >> but
> >>>>> would
> >>>>>> it
> >>>>>>>> not
> >>>>>>>>>> be
> >>>>>>>>>>> a
> >>>>>>>>>>>> good idea to have a usable default or fetch from the
> >> config
> >>>>> if
> >>>>>>>>>> available?
> >>>>>>>>>>>> After all you are proposing to add "window.size.ms"
> >>>>>>>>>>>>
> >>>>>>>>>>>> We definitely need a fix to this problem and adding "
> >>>>>>>> window.size.ms"
> >>>>>>>>>>> makes
> >>>>>>>>>>>> sense to me.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for the KIP,
> >>>>>>>>>>>> Walker
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Thu, Aug 20, 2020 at 2:22 PM Leah Thomas <
> >>>>>> lthomas@confluent.io>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I'd like to start a discussion for KIP-659:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
> >>>>>>>>>>>>> The goal of the KIP is to ensure that window size is
> >>>> passed
> >>>>>> to
> >>>>>>>> the
> >>>>>>>>>>> consumer
> >>>>>>>>>>>>> when needed, which will generally be for testing
> >>>> purposes,
> >>>>>> and to
> >>>>>>>>>> avoid
> >>>>>>>>>>>>> runtime errors when the *TimeWindowedSerde* is
> >> created
> >>>>>> without a
> >>>>>>>>>> window
> >>>>>>>>>>>>> size.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Looking forward to hearing your feedback.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>> Leah
> >>>>>>>>>>>>>
> >>
> >>
> > 
> 
> 
> Attachments:
> * signature.asc

Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

Posted by "Matthias J. Sax" <mj...@apache.org>.
Thanks for the KIP and the detailed discussion. I guess this all makes
sense.

-Matthias

On 9/2/20 1:28 PM, Leah Thomas wrote:
> Hey John,
> 
> I see what you say about the console consumer in particular. I don't think
> that adding the extra config would *hurt* at all, so I'm good with keeping
> that in the KIP. I re-updated the KIP proposal to include the configs.
> 
> The serde resolution sounds good to me as well, I added a few lines in the
> KIP about logging an error when the *timeWindowedSerde *implicit is called.
> 
> Let me know if there are any other concerns, else I'll resume voting.
> 
> Cheers,
> Leah
> 
> On Tue, Sep 1, 2020 at 11:17 AM John Roesler <vv...@apache.org> wrote:
> 
>> Hi Leah and Sophie,
>>
>> Sorry for the delayed response.
>>
>> You can pass in pre-instantiated (and therefore arbirarily
>> constructed) deserializers to the KafkaConsumer. However,
>> this doesn't mean we should drop the configs. The same
>> argument for dropping the configs implies that the consumer
>> shouldn't have configs for setting the deserializers at all.
>> This doesn't sound right, and I'm asking myself why. The
>> most likely answer seems to me to be that you sometimes
>> create a Consumer without invoking the Java constructor at
>> all. For example, when you use the console-consumer. In that
>> case, it would be indispensible to be able to fully
>> configure the deserializers via a properties file.
>>
>> Therefore, I think we should go ahead and propose the new
>> config. (Sorry for the flip-flop, Leah)
>>
>> Regarding the implicits, Leah's conclusion sounds good to
>> me. Yuriy is not adding any implicit for this serde to the
>> new class, and we'll just add an ERROR log to the existing
>> implicit. Once KIP-616 is merged, the existing implicit will
>> be deprecated along with all the other implicits in that
>> class, so there will be two "forces" pushing people to the
>> new interface, where they will discover the lack of an
>> implicit, which then forces them to call the non-deprecated
>> constructors directly.
>>
>> To answer Sophie's question, "implicit" is a feature of
>> Scala that allows the type system to automatically resolve
>> method arguments when there is just one possible argument in
>> scope. There's a bunch of docs for it, so I won't waste a
>> ton of e-ink on the details; the docs will be crystal clear
>> just assuming you know all about monads and monoids and
>> type-level programming ;)
>>
>> The punch line for us is that we provide implicits for the
>> basic serdes, and also for turning pairs of
>> serializers/deserializers into serdes, so you can avoid
>> explicitly passing any serdes into Streams DSL operations,
>> but also not have to fall back on the default key/value
>> serde configs. Instead, the type system will plug in the
>> right serde for the K/V types at each operation.
>>
>> We would _not_ add an implicit for a serde that we can't
>> construct in a context-free way using just type information,
>> as in this case. That's why Yuriy dropped the new implicit
>> and why we're going to add an error to the existing
>> implicit. On the other hand, removing the existing implicit
>> will cause compiler errors when the type system is no longer
>> able to find a suitable argument for an implicit parameter,
>> so we don't want to just remove the existing implicit.
>>
>> Thanks,
>> -John
>>
>> On Mon, 2020-08-31 at 16:28 -0500, Leah Thomas wrote:
>>> Hey Sophie,
>>>
>>> Thanks for the catch! It makes sense that the consumer would accept a
>>> deserializer somewhere, so we can definitely skip the additional
>> configs. I
>>> updated the KIP to reflect that.
>>>
>>> John seems to know Scala better than I do as well, but I think we need to
>>> keep the current implicit that allows users to just pass in a serde and
>> no
>>> window size for backwards compatibility. It seems to me that based on the
>>> discussion around KIP-616 <https://github.com/apache/kafka/pull/8955>;,
>> we
>>> can pretty easily do John's third suggestion for handling this implicit:
>>> logging an error message and passing to a non-deprecated constructor
>> using
>>> some default value. It seems from KIP-616 that most scala users will use
>>> the new Serdes class anyways, and Yuriy is just removing these implicits
>> so
>>> it seems like whatever fix we decide for this class won't get used too
>>> heavily.
>>>
>>> Cheers,
>>> Leah
>>>
>>> On Thu, Aug 27, 2020 at 8:49 PM Sophie Blee-Goldman <sophie@confluent.io
>>>
>>> wrote:
>>>
>>>> Ok I'm definitely feeling pretty dumb now, but I was just thinking how
>>>> ridiculous
>>>> it is that the Consumer forces you to configure your Deserializer
>> through
>>>> actual
>>>> config maps instead of just taking the ones you pass in directly. So I
>>>> thought
>>>> "why not just fix the Consumer to allow passing in an actual
>> Deserializer
>>>> object"
>>>> and went to go through the code in case there's some legitimate reason
>> why
>>>> not,
>>>> and what do you know. You actually can pass in an actual Deserializer
>>>> object!
>>>> There is a KafkaConsumer constructor that accepts a key and value
>>>> Deserializer,
>>>> and doesn't instantiate or configure a new one if provided in this way.
>>>> Duh.
>>>>
>>>> Sorry for misleading everyone on that front. I'm just happy to find out
>>>> that a
>>>> reasonable way of configuring deserializer actually *is *possible after
>>>> all. In that
>>>> case, maybe we can remove the extra configs from this KIP and just
>> proceed
>>>> with the deprecation?
>>>>
>>>> Obviously that doesn't help anything with regards to the remaining
>> question
>>>> that
>>>> John/Leah have posed. Now I probably don't have anything valuable to
>> offer
>>>> there
>>>> since I know next to nothing about Scala, but I do want to
>>>> better understand: why
>>>> would we add an "implicit" (what exactly does this mean?) that relies
>> on
>>>> allowing
>>>> users to not set the windowSize, if we are explicitly taking away that
>>>> option from
>>>> the Java users? Or if we have already added something, can't we just
>>>> deprecate
>>>> it like we are deprecating the Java constructor? I may need some
>> remedial
>>>> lessons
>>>> in Scala just to understand the problem that we apparently have,
>> because I
>>>> don't
>>>> get it.
>>>>
>>>> By the way, I'm a little tempted to say that we should go one step
>> further
>>>> and
>>>> deprecate the DEFAULT_WINDOWED_INNER_CLASS configs, but maybe that's
>>>> a bit too radical for the moment. It just seems like default serde
>> configs
>>>> have been
>>>> a lot more trouble than they're worth overall. That said, these
>> particular
>>>> configs
>>>> don't appear to have hurt anyone thus far, at least not that we know of
>>>> (possibly
>>>> because no one is using it anyway) so there's no strong motivation to
>> do so
>>>>
>>>> On Wed, Aug 26, 2020 at 9:19 AM Leah Thomas <lt...@confluent.io>
>> wrote:
>>>>
>>>>> Hey John,
>>>>>
>>>>> Thanks for pointing this out, I wasn't sure how to handle the Scala
>>>>> changes.
>>>>>
>>>>> I'm not fully versed in the Scala version of Streams, so feel free to
>>>>> correct me if any of my assumptions are wrong. I think logging an
>> error
>>>>> message and then calling the constructor that requires a windowSize
>> seems
>>>>> like the simplest fix from my point of view. So instead of
>>>>> calling`TimeWindowedSerde(final Serde<T> inner)`, we could
>>>>> call `TimeWindowedSerde(final Serde<T> inner, final long windowSize)`
>>>> with
>>>>> Long.MAX_VALUE as the window size.
>>>>>
>>>>> I do feel like we would want to add an implicit to `Serdes.scala`
>> that
>>>>> takes a serde and a window size so that users can access the
>> constructor
>>>>> that initializes with the correct window size. I agree with your
>> comment
>>>> on
>>>>> the KIP-616 PR that the serde needs to be pre-configured when it's
>>>> passed,
>>>>> but I'm not sure we would need a windowSize config. I think if the
>>>>> constructor is passed the serde and the window size, then window size
>>>>> should be set within the deserializer. The only catch is if the Scala
>>>>> version of the consumer creates a new deserializer, and at that point
>>>> we'd
>>>>> need a window size config, but I'm not sure if that's the case.
>>>>>
>>>>> WDYT - is it possible to alter the existing implicit and add a new
>> one?
>>>>>
>>>>> On Wed, Aug 26, 2020 at 10:00 AM John Roesler <vv...@apache.org>
>>>> wrote:
>>>>>> Hi Leah,
>>>>>>
>>>>>> I was just reviewing the PR for KIP-616 and realized that we
>>>>>> forgot to mention the Scala API in your KIP. We should
>>>>>> consider it because `scala.Serdes.timeWindowedSerde` is
>>>>>> implicitly using the exact constructor you're deprecating.
>>>>>>
>>>>>> I had some ideas in the code review:
>>>>>> https://github.com/apache/kafka/pull/8955#discussion_r477358755
>>>>>>
>>>>>> What do you think is the best approach?
>>>>>>
>>>>>> Concretely, I think Yuriy can make the call for KIP-616 (for
>>>>>> the new implicit that he's adding). But I think your KIP-659
>>>>>> should mention how we modify the existing implicit.
>>>>>>
>>>>>> Typically, we'd try to avoid throwing new exceptions or
>>>>>> causing compile errors, so
>>>>>> * dropping the implicit is probably off the table (compile
>>>>>> error).
>>>>>> * throwing an exception in the deserializer may not be ok,
>>>>>> althought it might still actually be ok since it's adding a
>>>>>> corruption check.
>>>>>> * logging an ERROR message and then passing through to the
>>>>>> underlying deserializer would be more conservative.
>>>>>>
>>>>>> What do you think we should do?
>>>>>>
>>>>>> Thanks,
>>>>>> -John
>>>>>>
>>>>>> On Fri, 2020-08-21 at 16:05 -0500, Leah Thomas wrote:
>>>>>>> Thanks for the typo catch, John.
>>>>>>>
>>>>>>> Let me know if anyone else has thoughts or ideas.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Leah
>>>>>>>
>>>>>>> On Fri, Aug 21, 2020 at 2:50 PM John Roesler <
>> vvcephei@apache.org>
>>>>>> wrote:
>>>>>>>> Thanks, all,
>>>>>>>>
>>>>>>>> Based on my reading of the conversation, it sounds like I
>>>>>>>> have some legwork to do in KIP-645, but our collective
>>>>>>>> instinct is that Leah's proposal doesn't need to change to
>>>>>>>> account for whatever we might decide to do in KIP-645.
>>>>>>>>
>>>>>>>> I have no further concerns about KIP-645, and I think it's a
>>>>>>>> good proposal.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> -John
>>>>>>>>
>>>>>>>> P.s., there's still a typo on the wiki that says
>>>>>>>> "ConsumerConfig" on the code block, even though the text now
>>>>>>>> says "StreamsConfig".
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, 2020-08-21 at 10:56 -0700, Sophie Blee-Goldman
>>>>>>>> wrote:
>>>>>>>>> Just want to make a quick comment on the question that John
>>>> raised
>>>>>> about
>>>>>>>>> whether we
>>>>>>>>> should introduce a separate config for "key" and "value"
>> window
>>>>>> sizes:
>>>>>>>>> My short answer is No, I don't think that's necessary. First
>> of
>>>>> all,
>>>>>> as
>>>>>>>> you
>>>>>>>>> said, there is no
>>>>>>>>> first-class concept of a "Windowed value" in the DSL.
>> Second, to
>>>>>> engage
>>>>>>>> in
>>>>>>>>> your rhetorical
>>>>>>>>> question, if there's no default window size for a Streams
>> program
>>>>>> then
>>>>>>>> how
>>>>>>>>> can there be a
>>>>>>>>> sensible default for the key AND a separate sensible default
>> for
>>>> a
>>>>>> value?
>>>>>>>>> I don't think we need to follow the existing pattern if it
>>>> doesn't
>>>>>> make
>>>>>>>>> sense, and to be honest
>>>>>>>>> I'm a bit skeptical that anyone was even using these default
>>>>> windowed
>>>>>>>> inner
>>>>>>>>> classes since
>>>>>>>>> the config wasn't even defined/documented until quite
>> recently.
>>>> I'd
>>>>>>>>> actually be in favor
>>>>>>>>> of deprecating
>>>>> StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
>>>>>>>>> but I don't want to drag that into this discussion as well.
>>>>>>>>>
>>>>>>>>> My understanding is that these were meant to mirror the
>> default
>>>>>> key/value
>>>>>>>>> serde configs, but
>>>>>>>>> the real use of the DEFAULT_WINDOWED_SERDE_INNER_CLASS
>> config is
>>>>>> actually
>>>>>>>>> that you
>>>>>>>>> can at least use it to configure the inner class for a
>> Consumer,
>>>>> thus
>>>>>>>>> making the TimeWindowed
>>>>>>>>> serdes functional at a basic level. With the window size
>> configs,
>>>>> the
>>>>>>>> point
>>>>>>>>> is not really to set a
>>>>>>>>> default but to make it actually work with a Consumer which
>>>>>> instantiates
>>>>>>>> the
>>>>>>>>> deserializer by
>>>>>>>>> reflection. So I don't think we should position this new
>> config
>>>> as
>>>>> a
>>>>>>>>> "default" (although it may
>>>>>>>>> technically behave as one) -- within Streams users can and
>> should
>>>>>> always
>>>>>>>>> supply the window
>>>>>>>>> size through the constructor. I don't think that's such an
>>>>>> inconvenience,
>>>>>>>>> vs the amount of
>>>>>>>>> confusion that will (and has) been caused by default
>>>> serde-related
>>>>>>>> configs
>>>>>>>>> in streams.
>>>>>>>>>
>>>>>>>>> Regarding the fixed vs variable sized config, one idea I had
>> was
>>>> to
>>>>>> just
>>>>>>>>> keep the fixed-size config
>>>>>>>>> and constructor and let users of enumerable windows override
>> the
>>>>>>>>> TimeWindowedSerde class(es)
>>>>>>>>> to do whatever it is they need. IIUC you already have to
>> override
>>>>>> some
>>>>>>>>> other windows-related
>>>>>>>>> classes to get variable-sized windows so doing the same for
>> the
>>>>>> serdes
>>>>>>>>> sounds reasonable to me.
>>>>>>>>> Just my take on the "simple things should be easy, difficult
>>>> things
>>>>>>>> should
>>>>>>>>> be possible" mantra
>>>>>>>>>
>>>>>>>>> One last quick side note: the reason we don't really need to
>>>>> discuss
>>>>>>>>> SessionWindows here
>>>>>>>>> is that they already encode both the start and end time for
>> the
>>>>>> window.
>>>>>>>>> This is probably the best
>>>>>>>>> way to go for TimeWindows as well, but making this change in
>> a
>>>>>> backwards
>>>>>>>>> compatible way is a
>>>>>>>>> much larger scope of work. And even then, we might want to
>>>> consider
>>>>>>>> making
>>>>>>>>> it possible to still
>>>>>>>>> just encode the start time to save space, thus requiring this
>>>>> config
>>>>>>>> either
>>>>>>>>> way
>>>>>>>>>
>>>>>>>>> On Fri, Aug 21, 2020 at 9:26 AM Leah Thomas <
>>>> lthomas@confluent.io>
>>>>>>>> wrote:
>>>>>>>>>> Thanks John and Walker for your thoughts.
>>>>>>>>>>
>>>>>>>>>> I agree with your two scenarios John, that you configure
>> fully
>>>> in
>>>>>> the
>>>>>>>>>> constructor, or you don't need to call `init()`. IIUC, if
>> we
>>>> pass
>>>>>> the
>>>>>>>>>> deserializer to the consumer, we want to make sure it has
>> the
>>>>>> window
>>>>>>>> size
>>>>>>>>>> is set using the newly required constructor. If we don't
>> pass
>>>> in
>>>>>> the
>>>>>>>>>> deserializer, the window size will be set through the
>> configs.
>>>> To
>>>>>>>> answer
>>>>>>>>>> Walker's question directly, because the configs aren't
>> passed
>>>> to
>>>>>> the
>>>>>>>>>> constructor, we can't set the window size unless we pass
>> it to
>>>>> the
>>>>>>>>>> constructor or configure the constructor after
>> initializing it.
>>>>>>>>>>
>>>>>>>>>> For users who would rather not set a strict window size
>>>> (outside
>>>>>> of the
>>>>>>>>>> variable size scenario), they can pass in Long.MAX_VALUE.
>> The
>>>> way
>>>>>> I see
>>>>>>>>>> this is instead of having the default be for scenarios that
>>>> don't
>>>>>>>> require a
>>>>>>>>>> window size, we have the default be the scenarios that
>> *do*,
>>>>>> flipping
>>>>>>>> the
>>>>>>>>>> current implementation to fit with typical use cases.
>>>>>>>>>>
>>>>>>>>>> On your points John:
>>>>>>>>>> 1. I agree that it makes sense to store it in
>> StreamsConfig,
>>>> this
>>>>>>>> shouldn't
>>>>>>>>>> cause any issues. I've updated the KIP accordingly.
>>>>>>>>>>
>>>>>>>>>> 2. The non-fixed time windows issue is a good point. It
>> seems
>>>>> like
>>>>>>>> calendar
>>>>>>>>>> windows in particular are quite useful, so I think we want
>> to
>>>>> make
>>>>>> sure
>>>>>>>>>> that this wouldn't inhibit flexible sized windows. I think
>>>> having
>>>>>> two
>>>>>>>>>> different configs and functions makes sense, although it is
>>>>>> slightly
>>>>>>>>>> messier. While requiring all time windows to use the
>>>>> WindowFunction
>>>>>>>>>> constructor would work, I think that allowing users to
>> access
>>>> the
>>>>>>>>>> WindowSize constructor is preferable because it seems
>> easier to
>>>>>> use for
>>>>>>>>>> people who are not at all interested in delving into
>> variably
>>>>> sized
>>>>>>>>>> windows. This assumption could be wrong though, and perhaps
>>>> users
>>>>>> would
>>>>>>>>>> adapt quickly to the new WindowFunction style, but my
>> immediate
>>>>>>>> reaction is
>>>>>>>>>> to support both configs and constructors.
>>>>>>>>>>
>>>>>>>>>> One note on this is that Session Windows are handled
>> separately
>>>>>> from
>>>>>>>> time
>>>>>>>>>> windows and also have variable window sizes. I assume that
>> the
>>>>>>>> TimeWindowed
>>>>>>>>>> option is preferable for variably sized windows because you
>>>> still
>>>>>> want
>>>>>>>> to
>>>>>>>>>> access the window end times? But I think one alternative
>> could
>>>> be
>>>>>>>>>> separating the variably sized windows from the current
>>>>>> implementation
>>>>>>>> of
>>>>>>>>>> time windows, although I think KIP-645
>>>>>>>>>> <
>>>>>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface
>>>>>>>>>> would make this not strictly necessary.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Leah
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 21, 2020 at 10:04 AM John Roesler <
>>>>> vvcephei@apache.org
>>>>>>>> wrote:
>>>>>>>>>>> Hi Leah,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the KIP! This has been a real pain for some
>> use
>>>>>>>>>>> cases, so it's really good to see a proposal to fix it.
>>>>>>>>>>>
>>>>>>>>>>> We do need a default constructor so that it can be
>>>>>>>>>>> dynamically instantiated by the consumer (or any other
>>>>>>>>>>> component). But I'm +1 on deprecating the constructor
>> you're
>>>>>>>>>>> proposing to deprecate, which only partially configures
>> the
>>>>>>>>>>> class. It seems like there are exactly two patterns:
>> either
>>>>>>>>>>> you fully configure the class in the constructor and
>> don't
>>>>>>>>>>> call `init()`, or you call the default constructor and
>> then
>>>>>>>>>>> configure the class by calling `init()`.
>>>>>>>>>>>
>>>>>>>>>>> I can appreciate Walker's point, but stepping back, it
>>>>>>>>>>> doesn't actually seem that useful to partially configure
>> the
>>>>>>>>>>> class in the constructor and then finish up the
>>>>>>>>>>> configuration by calling `init()`. I could see the
>> argument
>>>>>>>>>>> if there were a sensible default, but for this particular
>>>>>>>>>>> class, there isn't one. Rhetorical question: what is the
>>>>>>>>>>> default window size for Streams programs?
>>>>>>>>>>>
>>>>>>>>>>> I have a couple of concerns to discuss:
>>>>>>>>>>>
>>>>>>>>>>> 1. Config Location
>>>>>>>>>>>
>>>>>>>>>>> I don't think I would add the new configs to
>> ConsumerConfig,
>>>>>>>>>>> but would add it to StreamsConfig instead. The
>> deserailzier
>>>>>>>>>>> itself is in Streams (it is
>>>>>>>>>>> o.a.k.streams.kstream.TimeWindowedDeserializer), so it
>> seems
>>>>>>>>>>> odd to have one of its configurations in a completely
>>>>>>>>>>> different module.
>>>>>>>>>>>
>>>>>>>>>>> Also, this class already has two configs, which are in
>>>>>>>>>>> StreamsConfig:
>>>>>>>>>>> StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
>>>>>>>>>>> StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
>>>>>>>>>>>
>>>>>>>>>>> It seems like the new config belongs right next to the
>>>>>>>>>>> existing ones.
>>>>>>>>>>>
>>>>>>>>>>> For me, it raises a secondary question:
>>>>>>>>>>> 1b: Should there be a KEY_WINDOW_SIZE and a
>>>>>>>>>>> VALUE_WINDOW_SIZE? I'm honestly not sure what a "windowed
>>>>>>>>>>> value" even is, but the fact that we can configure serdes
>>>>>>>>>>> for it implies that perhaps we should symmetrically
>>>>>>>>>>> configure its size as well.
>>>>>>>>>>>
>>>>>>>>>>> 2. Fixed Size Assumption
>>>>>>>>>>>
>>>>>>>>>>> In KIP-645, I'm proposing to lift the assumption that
>>>>>>>>>>> TimeWindows have a fixed size at all, but KIP-659 is
>>>>>>>>>>> currently built on that assumption.
>>>>>>>>>>>
>>>>>>>>>>> For details on why this is not a good assumtion, see:
>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10408
>>>>>>>>>>>
>>>>>>>>>>> In fact, in my POC PR for KIP-659, I'm dropping the
>>>>>>>>>>> constructor that takes a "window size" parameter in
>> favor of
>>>>>>>>>>> one that takes a window function, mapping a window start
>>>>>>>>>>> time to a full Window(start, end).
>>>>>>>>>>>
>>>>>>>>>>> In that context, it seems incongruous to introduce a
>>>>>>>>>>> configuration that specifies a window size. Of course, my
>>>>>>>>>>> KIP is also under discussion, so my proposal may not
>>>>>>>>>>> eventually be accepted. But it is necessary to consider
>> both
>>>>>>>>>>> of these concerns together.
>>>>>>>>>>>
>>>>>>>>>>> One option seems to be to accept both. Namely, we keep
>> the
>>>>>>>>>>> "fixed size" constructor AND add my new constructor (for
>>>>>>>>>>> variably sized windows). Likewise, we accept your
>> proposal,
>>>>>>>>>>> and KIP-659 would propose to add a new config specifying
>> a
>>>>>>>>>>> windowing function, such as:
>>>>>>>>>>>
>>>>>>>>>>>> StreamsConfig.WINDOW_FUNCTION_CONFIG
>>>>>>>>>>>
>>>>>>>>>>> which would be an instance of:
>>>>>>>>>>>
>>>>>>>>>>>> public interface WindowFunction implements
>> Function<Long,
>>>>>>>>>>> Window>;
>>>>>>>>>>>
>>>>>>>>>>> I'm not bringing these up for discussion in your KIP
>> right
>>>>>>>>>>> now, just demonstrating the feasibility of merging both
>>>>>>>>>>> proposals.
>>>>>>>>>>>
>>>>>>>>>>> My question for you: do you think the general strategy of
>>>>>>>>>>> having two constructors and two configs, one for fixed
>> and
>>>>>>>>>>> one for variable windows, makes sense? Is it too
>>>>>>>>>>> complicated? Do you have a better idea?
>>>>>>>>>>>
>>>>>>>>>>> Thanks!
>>>>>>>>>>> -John
>>>>>>>>>>>
>>>>>>>>>>> On Thu, 2020-08-20 at 14:49 -0700, Walker Carlson wrote:
>>>>>>>>>>>> Hi Leah,
>>>>>>>>>>>>
>>>>>>>>>>>> Could you explain a bit more why we do not wish to
>>>>>>>>>>>> let TimeWindowedDeserializer and WindowedSerdes be
>> created
>>>>>> without
>>>>>>>> a
>>>>>>>>>>>> specified time as a parameter?
>>>>>>>>>>>>
>>>>>>>>>>>> I understand the long.MAX_VALUE could cause problems
>> but
>>>>> would
>>>>>> it
>>>>>>>> not
>>>>>>>>>> be
>>>>>>>>>>> a
>>>>>>>>>>>> good idea to have a usable default or fetch from the
>> config
>>>>> if
>>>>>>>>>> available?
>>>>>>>>>>>> After all you are proposing to add "window.size.ms"
>>>>>>>>>>>>
>>>>>>>>>>>> We definitely need a fix to this problem and adding "
>>>>>>>> window.size.ms"
>>>>>>>>>>> makes
>>>>>>>>>>>> sense to me.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the KIP,
>>>>>>>>>>>> Walker
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Aug 20, 2020 at 2:22 PM Leah Thomas <
>>>>>> lthomas@confluent.io>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'd like to start a discussion for KIP-659:
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
>>>>>>>>>>>>> The goal of the KIP is to ensure that window size is
>>>> passed
>>>>>> to
>>>>>>>> the
>>>>>>>>>>> consumer
>>>>>>>>>>>>> when needed, which will generally be for testing
>>>> purposes,
>>>>>> and to
>>>>>>>>>> avoid
>>>>>>>>>>>>> runtime errors when the *TimeWindowedSerde* is
>> created
>>>>>> without a
>>>>>>>>>> window
>>>>>>>>>>>>> size.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Looking forward to hearing your feedback.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Leah
>>>>>>>>>>>>>
>>
>>
> 


Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

Posted by Leah Thomas <lt...@confluent.io>.
Hey John,

I see what you say about the console consumer in particular. I don't think
that adding the extra config would *hurt* at all, so I'm good with keeping
that in the KIP. I re-updated the KIP proposal to include the configs.

The serde resolution sounds good to me as well, I added a few lines in the
KIP about logging an error when the *timeWindowedSerde *implicit is called.

Let me know if there are any other concerns, else I'll resume voting.

Cheers,
Leah

On Tue, Sep 1, 2020 at 11:17 AM John Roesler <vv...@apache.org> wrote:

> Hi Leah and Sophie,
>
> Sorry for the delayed response.
>
> You can pass in pre-instantiated (and therefore arbirarily
> constructed) deserializers to the KafkaConsumer. However,
> this doesn't mean we should drop the configs. The same
> argument for dropping the configs implies that the consumer
> shouldn't have configs for setting the deserializers at all.
> This doesn't sound right, and I'm asking myself why. The
> most likely answer seems to me to be that you sometimes
> create a Consumer without invoking the Java constructor at
> all. For example, when you use the console-consumer. In that
> case, it would be indispensible to be able to fully
> configure the deserializers via a properties file.
>
> Therefore, I think we should go ahead and propose the new
> config. (Sorry for the flip-flop, Leah)
>
> Regarding the implicits, Leah's conclusion sounds good to
> me. Yuriy is not adding any implicit for this serde to the
> new class, and we'll just add an ERROR log to the existing
> implicit. Once KIP-616 is merged, the existing implicit will
> be deprecated along with all the other implicits in that
> class, so there will be two "forces" pushing people to the
> new interface, where they will discover the lack of an
> implicit, which then forces them to call the non-deprecated
> constructors directly.
>
> To answer Sophie's question, "implicit" is a feature of
> Scala that allows the type system to automatically resolve
> method arguments when there is just one possible argument in
> scope. There's a bunch of docs for it, so I won't waste a
> ton of e-ink on the details; the docs will be crystal clear
> just assuming you know all about monads and monoids and
> type-level programming ;)
>
> The punch line for us is that we provide implicits for the
> basic serdes, and also for turning pairs of
> serializers/deserializers into serdes, so you can avoid
> explicitly passing any serdes into Streams DSL operations,
> but also not have to fall back on the default key/value
> serde configs. Instead, the type system will plug in the
> right serde for the K/V types at each operation.
>
> We would _not_ add an implicit for a serde that we can't
> construct in a context-free way using just type information,
> as in this case. That's why Yuriy dropped the new implicit
> and why we're going to add an error to the existing
> implicit. On the other hand, removing the existing implicit
> will cause compiler errors when the type system is no longer
> able to find a suitable argument for an implicit parameter,
> so we don't want to just remove the existing implicit.
>
> Thanks,
> -John
>
> On Mon, 2020-08-31 at 16:28 -0500, Leah Thomas wrote:
> > Hey Sophie,
> >
> > Thanks for the catch! It makes sense that the consumer would accept a
> > deserializer somewhere, so we can definitely skip the additional
> configs. I
> > updated the KIP to reflect that.
> >
> > John seems to know Scala better than I do as well, but I think we need to
> > keep the current implicit that allows users to just pass in a serde and
> no
> > window size for backwards compatibility. It seems to me that based on the
> > discussion around KIP-616 <https://github.com/apache/kafka/pull/8955>;,
> we
> > can pretty easily do John's third suggestion for handling this implicit:
> > logging an error message and passing to a non-deprecated constructor
> using
> > some default value. It seems from KIP-616 that most scala users will use
> > the new Serdes class anyways, and Yuriy is just removing these implicits
> so
> > it seems like whatever fix we decide for this class won't get used too
> > heavily.
> >
> > Cheers,
> > Leah
> >
> > On Thu, Aug 27, 2020 at 8:49 PM Sophie Blee-Goldman <sophie@confluent.io
> >
> > wrote:
> >
> > > Ok I'm definitely feeling pretty dumb now, but I was just thinking how
> > > ridiculous
> > > it is that the Consumer forces you to configure your Deserializer
> through
> > > actual
> > > config maps instead of just taking the ones you pass in directly. So I
> > > thought
> > > "why not just fix the Consumer to allow passing in an actual
> Deserializer
> > > object"
> > > and went to go through the code in case there's some legitimate reason
> why
> > > not,
> > > and what do you know. You actually can pass in an actual Deserializer
> > > object!
> > > There is a KafkaConsumer constructor that accepts a key and value
> > > Deserializer,
> > > and doesn't instantiate or configure a new one if provided in this way.
> > > Duh.
> > >
> > > Sorry for misleading everyone on that front. I'm just happy to find out
> > > that a
> > > reasonable way of configuring deserializer actually *is *possible after
> > > all. In that
> > > case, maybe we can remove the extra configs from this KIP and just
> proceed
> > > with the deprecation?
> > >
> > > Obviously that doesn't help anything with regards to the remaining
> question
> > > that
> > > John/Leah have posed. Now I probably don't have anything valuable to
> offer
> > > there
> > > since I know next to nothing about Scala, but I do want to
> > > better understand: why
> > > would we add an "implicit" (what exactly does this mean?) that relies
> on
> > > allowing
> > > users to not set the windowSize, if we are explicitly taking away that
> > > option from
> > > the Java users? Or if we have already added something, can't we just
> > > deprecate
> > > it like we are deprecating the Java constructor? I may need some
> remedial
> > > lessons
> > > in Scala just to understand the problem that we apparently have,
> because I
> > > don't
> > > get it.
> > >
> > > By the way, I'm a little tempted to say that we should go one step
> further
> > > and
> > > deprecate the DEFAULT_WINDOWED_INNER_CLASS configs, but maybe that's
> > > a bit too radical for the moment. It just seems like default serde
> configs
> > > have been
> > > a lot more trouble than they're worth overall. That said, these
> particular
> > > configs
> > > don't appear to have hurt anyone thus far, at least not that we know of
> > > (possibly
> > > because no one is using it anyway) so there's no strong motivation to
> do so
> > >
> > > On Wed, Aug 26, 2020 at 9:19 AM Leah Thomas <lt...@confluent.io>
> wrote:
> > >
> > > > Hey John,
> > > >
> > > > Thanks for pointing this out, I wasn't sure how to handle the Scala
> > > > changes.
> > > >
> > > > I'm not fully versed in the Scala version of Streams, so feel free to
> > > > correct me if any of my assumptions are wrong. I think logging an
> error
> > > > message and then calling the constructor that requires a windowSize
> seems
> > > > like the simplest fix from my point of view. So instead of
> > > > calling`TimeWindowedSerde(final Serde<T> inner)`, we could
> > > > call `TimeWindowedSerde(final Serde<T> inner, final long windowSize)`
> > > with
> > > > Long.MAX_VALUE as the window size.
> > > >
> > > > I do feel like we would want to add an implicit to `Serdes.scala`
> that
> > > > takes a serde and a window size so that users can access the
> constructor
> > > > that initializes with the correct window size. I agree with your
> comment
> > > on
> > > > the KIP-616 PR that the serde needs to be pre-configured when it's
> > > passed,
> > > > but I'm not sure we would need a windowSize config. I think if the
> > > > constructor is passed the serde and the window size, then window size
> > > > should be set within the deserializer. The only catch is if the Scala
> > > > version of the consumer creates a new deserializer, and at that point
> > > we'd
> > > > need a window size config, but I'm not sure if that's the case.
> > > >
> > > > WDYT - is it possible to alter the existing implicit and add a new
> one?
> > > >
> > > > On Wed, Aug 26, 2020 at 10:00 AM John Roesler <vv...@apache.org>
> > > wrote:
> > > > > Hi Leah,
> > > > >
> > > > > I was just reviewing the PR for KIP-616 and realized that we
> > > > > forgot to mention the Scala API in your KIP. We should
> > > > > consider it because `scala.Serdes.timeWindowedSerde` is
> > > > > implicitly using the exact constructor you're deprecating.
> > > > >
> > > > > I had some ideas in the code review:
> > > > > https://github.com/apache/kafka/pull/8955#discussion_r477358755
> > > > >
> > > > > What do you think is the best approach?
> > > > >
> > > > > Concretely, I think Yuriy can make the call for KIP-616 (for
> > > > > the new implicit that he's adding). But I think your KIP-659
> > > > > should mention how we modify the existing implicit.
> > > > >
> > > > > Typically, we'd try to avoid throwing new exceptions or
> > > > > causing compile errors, so
> > > > > * dropping the implicit is probably off the table (compile
> > > > > error).
> > > > > * throwing an exception in the deserializer may not be ok,
> > > > > althought it might still actually be ok since it's adding a
> > > > > corruption check.
> > > > > * logging an ERROR message and then passing through to the
> > > > > underlying deserializer would be more conservative.
> > > > >
> > > > > What do you think we should do?
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > > > On Fri, 2020-08-21 at 16:05 -0500, Leah Thomas wrote:
> > > > > > Thanks for the typo catch, John.
> > > > > >
> > > > > > Let me know if anyone else has thoughts or ideas.
> > > > > >
> > > > > > Cheers,
> > > > > > Leah
> > > > > >
> > > > > > On Fri, Aug 21, 2020 at 2:50 PM John Roesler <
> vvcephei@apache.org>
> > > > > wrote:
> > > > > > > Thanks, all,
> > > > > > >
> > > > > > > Based on my reading of the conversation, it sounds like I
> > > > > > > have some legwork to do in KIP-645, but our collective
> > > > > > > instinct is that Leah's proposal doesn't need to change to
> > > > > > > account for whatever we might decide to do in KIP-645.
> > > > > > >
> > > > > > > I have no further concerns about KIP-645, and I think it's a
> > > > > > > good proposal.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > -John
> > > > > > >
> > > > > > > P.s., there's still a typo on the wiki that says
> > > > > > > "ConsumerConfig" on the code block, even though the text now
> > > > > > > says "StreamsConfig".
> > > > > > >
> > > > > > >
> > > > > > > On Fri, 2020-08-21 at 10:56 -0700, Sophie Blee-Goldman
> > > > > > > wrote:
> > > > > > > > Just want to make a quick comment on the question that John
> > > raised
> > > > > about
> > > > > > > > whether we
> > > > > > > > should introduce a separate config for "key" and "value"
> window
> > > > > sizes:
> > > > > > > > My short answer is No, I don't think that's necessary. First
> of
> > > > all,
> > > > > as
> > > > > > > you
> > > > > > > > said, there is no
> > > > > > > > first-class concept of a "Windowed value" in the DSL.
> Second, to
> > > > > engage
> > > > > > > in
> > > > > > > > your rhetorical
> > > > > > > > question, if there's no default window size for a Streams
> program
> > > > > then
> > > > > > > how
> > > > > > > > can there be a
> > > > > > > > sensible default for the key AND a separate sensible default
> for
> > > a
> > > > > value?
> > > > > > > > I don't think we need to follow the existing pattern if it
> > > doesn't
> > > > > make
> > > > > > > > sense, and to be honest
> > > > > > > > I'm a bit skeptical that anyone was even using these default
> > > > windowed
> > > > > > > inner
> > > > > > > > classes since
> > > > > > > > the config wasn't even defined/documented until quite
> recently.
> > > I'd
> > > > > > > > actually be in favor
> > > > > > > > of deprecating
> > > > StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> > > > > > > > but I don't want to drag that into this discussion as well.
> > > > > > > >
> > > > > > > > My understanding is that these were meant to mirror the
> default
> > > > > key/value
> > > > > > > > serde configs, but
> > > > > > > > the real use of the DEFAULT_WINDOWED_SERDE_INNER_CLASS
> config is
> > > > > actually
> > > > > > > > that you
> > > > > > > > can at least use it to configure the inner class for a
> Consumer,
> > > > thus
> > > > > > > > making the TimeWindowed
> > > > > > > > serdes functional at a basic level. With the window size
> configs,
> > > > the
> > > > > > > point
> > > > > > > > is not really to set a
> > > > > > > > default but to make it actually work with a Consumer which
> > > > > instantiates
> > > > > > > the
> > > > > > > > deserializer by
> > > > > > > > reflection. So I don't think we should position this new
> config
> > > as
> > > > a
> > > > > > > > "default" (although it may
> > > > > > > > technically behave as one) -- within Streams users can and
> should
> > > > > always
> > > > > > > > supply the window
> > > > > > > > size through the constructor. I don't think that's such an
> > > > > inconvenience,
> > > > > > > > vs the amount of
> > > > > > > > confusion that will (and has) been caused by default
> > > serde-related
> > > > > > > configs
> > > > > > > > in streams.
> > > > > > > >
> > > > > > > > Regarding the fixed vs variable sized config, one idea I had
> was
> > > to
> > > > > just
> > > > > > > > keep the fixed-size config
> > > > > > > > and constructor and let users of enumerable windows override
> the
> > > > > > > > TimeWindowedSerde class(es)
> > > > > > > > to do whatever it is they need. IIUC you already have to
> override
> > > > > some
> > > > > > > > other windows-related
> > > > > > > > classes to get variable-sized windows so doing the same for
> the
> > > > > serdes
> > > > > > > > sounds reasonable to me.
> > > > > > > > Just my take on the "simple things should be easy, difficult
> > > things
> > > > > > > should
> > > > > > > > be possible" mantra
> > > > > > > >
> > > > > > > > One last quick side note: the reason we don't really need to
> > > > discuss
> > > > > > > > SessionWindows here
> > > > > > > > is that they already encode both the start and end time for
> the
> > > > > window.
> > > > > > > > This is probably the best
> > > > > > > > way to go for TimeWindows as well, but making this change in
> a
> > > > > backwards
> > > > > > > > compatible way is a
> > > > > > > > much larger scope of work. And even then, we might want to
> > > consider
> > > > > > > making
> > > > > > > > it possible to still
> > > > > > > > just encode the start time to save space, thus requiring this
> > > > config
> > > > > > > either
> > > > > > > > way
> > > > > > > >
> > > > > > > > On Fri, Aug 21, 2020 at 9:26 AM Leah Thomas <
> > > lthomas@confluent.io>
> > > > > > > wrote:
> > > > > > > > > Thanks John and Walker for your thoughts.
> > > > > > > > >
> > > > > > > > > I agree with your two scenarios John, that you configure
> fully
> > > in
> > > > > the
> > > > > > > > > constructor, or you don't need to call `init()`. IIUC, if
> we
> > > pass
> > > > > the
> > > > > > > > > deserializer to the consumer, we want to make sure it has
> the
> > > > > window
> > > > > > > size
> > > > > > > > > is set using the newly required constructor. If we don't
> pass
> > > in
> > > > > the
> > > > > > > > > deserializer, the window size will be set through the
> configs.
> > > To
> > > > > > > answer
> > > > > > > > > Walker's question directly, because the configs aren't
> passed
> > > to
> > > > > the
> > > > > > > > > constructor, we can't set the window size unless we pass
> it to
> > > > the
> > > > > > > > > constructor or configure the constructor after
> initializing it.
> > > > > > > > >
> > > > > > > > > For users who would rather not set a strict window size
> > > (outside
> > > > > of the
> > > > > > > > > variable size scenario), they can pass in Long.MAX_VALUE.
> The
> > > way
> > > > > I see
> > > > > > > > > this is instead of having the default be for scenarios that
> > > don't
> > > > > > > require a
> > > > > > > > > window size, we have the default be the scenarios that
> *do*,
> > > > > flipping
> > > > > > > the
> > > > > > > > > current implementation to fit with typical use cases.
> > > > > > > > >
> > > > > > > > > On your points John:
> > > > > > > > > 1. I agree that it makes sense to store it in
> StreamsConfig,
> > > this
> > > > > > > shouldn't
> > > > > > > > > cause any issues. I've updated the KIP accordingly.
> > > > > > > > >
> > > > > > > > > 2. The non-fixed time windows issue is a good point. It
> seems
> > > > like
> > > > > > > calendar
> > > > > > > > > windows in particular are quite useful, so I think we want
> to
> > > > make
> > > > > sure
> > > > > > > > > that this wouldn't inhibit flexible sized windows. I think
> > > having
> > > > > two
> > > > > > > > > different configs and functions makes sense, although it is
> > > > > slightly
> > > > > > > > > messier. While requiring all time windows to use the
> > > > WindowFunction
> > > > > > > > > constructor would work, I think that allowing users to
> access
> > > the
> > > > > > > > > WindowSize constructor is preferable because it seems
> easier to
> > > > > use for
> > > > > > > > > people who are not at all interested in delving into
> variably
> > > > sized
> > > > > > > > > windows. This assumption could be wrong though, and perhaps
> > > users
> > > > > would
> > > > > > > > > adapt quickly to the new WindowFunction style, but my
> immediate
> > > > > > > reaction is
> > > > > > > > > to support both configs and constructors.
> > > > > > > > >
> > > > > > > > > One note on this is that Session Windows are handled
> separately
> > > > > from
> > > > > > > time
> > > > > > > > > windows and also have variable window sizes. I assume that
> the
> > > > > > > TimeWindowed
> > > > > > > > > option is preferable for variably sized windows because you
> > > still
> > > > > want
> > > > > > > to
> > > > > > > > > access the window end times? But I think one alternative
> could
> > > be
> > > > > > > > > separating the variably sized windows from the current
> > > > > implementation
> > > > > > > of
> > > > > > > > > time windows, although I think KIP-645
> > > > > > > > > <
> > > > > > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface
> > > > > > > > > would make this not strictly necessary.
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > > Leah
> > > > > > > > >
> > > > > > > > > On Fri, Aug 21, 2020 at 10:04 AM John Roesler <
> > > > vvcephei@apache.org
> > > > > > > wrote:
> > > > > > > > > > Hi Leah,
> > > > > > > > > >
> > > > > > > > > > Thanks for the KIP! This has been a real pain for some
> use
> > > > > > > > > > cases, so it's really good to see a proposal to fix it.
> > > > > > > > > >
> > > > > > > > > > We do need a default constructor so that it can be
> > > > > > > > > > dynamically instantiated by the consumer (or any other
> > > > > > > > > > component). But I'm +1 on deprecating the constructor
> you're
> > > > > > > > > > proposing to deprecate, which only partially configures
> the
> > > > > > > > > > class. It seems like there are exactly two patterns:
> either
> > > > > > > > > > you fully configure the class in the constructor and
> don't
> > > > > > > > > > call `init()`, or you call the default constructor and
> then
> > > > > > > > > > configure the class by calling `init()`.
> > > > > > > > > >
> > > > > > > > > > I can appreciate Walker's point, but stepping back, it
> > > > > > > > > > doesn't actually seem that useful to partially configure
> the
> > > > > > > > > > class in the constructor and then finish up the
> > > > > > > > > > configuration by calling `init()`. I could see the
> argument
> > > > > > > > > > if there were a sensible default, but for this particular
> > > > > > > > > > class, there isn't one. Rhetorical question: what is the
> > > > > > > > > > default window size for Streams programs?
> > > > > > > > > >
> > > > > > > > > > I have a couple of concerns to discuss:
> > > > > > > > > >
> > > > > > > > > > 1. Config Location
> > > > > > > > > >
> > > > > > > > > > I don't think I would add the new configs to
> ConsumerConfig,
> > > > > > > > > > but would add it to StreamsConfig instead. The
> deserailzier
> > > > > > > > > > itself is in Streams (it is
> > > > > > > > > > o.a.k.streams.kstream.TimeWindowedDeserializer), so it
> seems
> > > > > > > > > > odd to have one of its configurations in a completely
> > > > > > > > > > different module.
> > > > > > > > > >
> > > > > > > > > > Also, this class already has two configs, which are in
> > > > > > > > > > StreamsConfig:
> > > > > > > > > > StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> > > > > > > > > > StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> > > > > > > > > >
> > > > > > > > > > It seems like the new config belongs right next to the
> > > > > > > > > > existing ones.
> > > > > > > > > >
> > > > > > > > > > For me, it raises a secondary question:
> > > > > > > > > > 1b: Should there be a KEY_WINDOW_SIZE and a
> > > > > > > > > > VALUE_WINDOW_SIZE? I'm honestly not sure what a "windowed
> > > > > > > > > > value" even is, but the fact that we can configure serdes
> > > > > > > > > > for it implies that perhaps we should symmetrically
> > > > > > > > > > configure its size as well.
> > > > > > > > > >
> > > > > > > > > > 2. Fixed Size Assumption
> > > > > > > > > >
> > > > > > > > > > In KIP-645, I'm proposing to lift the assumption that
> > > > > > > > > > TimeWindows have a fixed size at all, but KIP-659 is
> > > > > > > > > > currently built on that assumption.
> > > > > > > > > >
> > > > > > > > > > For details on why this is not a good assumtion, see:
> > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-10408
> > > > > > > > > >
> > > > > > > > > > In fact, in my POC PR for KIP-659, I'm dropping the
> > > > > > > > > > constructor that takes a "window size" parameter in
> favor of
> > > > > > > > > > one that takes a window function, mapping a window start
> > > > > > > > > > time to a full Window(start, end).
> > > > > > > > > >
> > > > > > > > > > In that context, it seems incongruous to introduce a
> > > > > > > > > > configuration that specifies a window size. Of course, my
> > > > > > > > > > KIP is also under discussion, so my proposal may not
> > > > > > > > > > eventually be accepted. But it is necessary to consider
> both
> > > > > > > > > > of these concerns together.
> > > > > > > > > >
> > > > > > > > > > One option seems to be to accept both. Namely, we keep
> the
> > > > > > > > > > "fixed size" constructor AND add my new constructor (for
> > > > > > > > > > variably sized windows). Likewise, we accept your
> proposal,
> > > > > > > > > > and KIP-659 would propose to add a new config specifying
> a
> > > > > > > > > > windowing function, such as:
> > > > > > > > > >
> > > > > > > > > > > StreamsConfig.WINDOW_FUNCTION_CONFIG
> > > > > > > > > >
> > > > > > > > > > which would be an instance of:
> > > > > > > > > >
> > > > > > > > > > > public interface WindowFunction implements
> Function<Long,
> > > > > > > > > > Window>;
> > > > > > > > > >
> > > > > > > > > > I'm not bringing these up for discussion in your KIP
> right
> > > > > > > > > > now, just demonstrating the feasibility of merging both
> > > > > > > > > > proposals.
> > > > > > > > > >
> > > > > > > > > > My question for you: do you think the general strategy of
> > > > > > > > > > having two constructors and two configs, one for fixed
> and
> > > > > > > > > > one for variable windows, makes sense? Is it too
> > > > > > > > > > complicated? Do you have a better idea?
> > > > > > > > > >
> > > > > > > > > > Thanks!
> > > > > > > > > > -John
> > > > > > > > > >
> > > > > > > > > > On Thu, 2020-08-20 at 14:49 -0700, Walker Carlson wrote:
> > > > > > > > > > > Hi Leah,
> > > > > > > > > > >
> > > > > > > > > > > Could you explain a bit more why we do not wish to
> > > > > > > > > > > let TimeWindowedDeserializer and WindowedSerdes be
> created
> > > > > without
> > > > > > > a
> > > > > > > > > > > specified time as a parameter?
> > > > > > > > > > >
> > > > > > > > > > > I understand the long.MAX_VALUE could cause problems
> but
> > > > would
> > > > > it
> > > > > > > not
> > > > > > > > > be
> > > > > > > > > > a
> > > > > > > > > > > good idea to have a usable default or fetch from the
> config
> > > > if
> > > > > > > > > available?
> > > > > > > > > > > After all you are proposing to add "window.size.ms"
> > > > > > > > > > >
> > > > > > > > > > > We definitely need a fix to this problem and adding "
> > > > > > > window.size.ms"
> > > > > > > > > > makes
> > > > > > > > > > > sense to me.
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the KIP,
> > > > > > > > > > > Walker
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Aug 20, 2020 at 2:22 PM Leah Thomas <
> > > > > lthomas@confluent.io>
> > > > > > > > > > wrote:
> > > > > > > > > > > > Hi all,
> > > > > > > > > > > >
> > > > > > > > > > > > I'd like to start a discussion for KIP-659:
> > > > > > > > > > > >
> > > > > > > > > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
> > > > > > > > > > > > The goal of the KIP is to ensure that window size is
> > > passed
> > > > > to
> > > > > > > the
> > > > > > > > > > consumer
> > > > > > > > > > > > when needed, which will generally be for testing
> > > purposes,
> > > > > and to
> > > > > > > > > avoid
> > > > > > > > > > > > runtime errors when the *TimeWindowedSerde* is
> created
> > > > > without a
> > > > > > > > > window
> > > > > > > > > > > > size.
> > > > > > > > > > > >
> > > > > > > > > > > > Looking forward to hearing your feedback.
> > > > > > > > > > > >
> > > > > > > > > > > > Cheers,
> > > > > > > > > > > > Leah
> > > > > > > > > > > >
>
>

Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

Posted by John Roesler <vv...@apache.org>.
Hi Leah and Sophie,

Sorry for the delayed response.

You can pass in pre-instantiated (and therefore arbirarily
constructed) deserializers to the KafkaConsumer. However,
this doesn't mean we should drop the configs. The same
argument for dropping the configs implies that the consumer
shouldn't have configs for setting the deserializers at all.
This doesn't sound right, and I'm asking myself why. The
most likely answer seems to me to be that you sometimes
create a Consumer without invoking the Java constructor at
all. For example, when you use the console-consumer. In that
case, it would be indispensible to be able to fully
configure the deserializers via a properties file.

Therefore, I think we should go ahead and propose the new
config. (Sorry for the flip-flop, Leah)

Regarding the implicits, Leah's conclusion sounds good to
me. Yuriy is not adding any implicit for this serde to the
new class, and we'll just add an ERROR log to the existing
implicit. Once KIP-616 is merged, the existing implicit will
be deprecated along with all the other implicits in that
class, so there will be two "forces" pushing people to the
new interface, where they will discover the lack of an
implicit, which then forces them to call the non-deprecated
constructors directly.

To answer Sophie's question, "implicit" is a feature of
Scala that allows the type system to automatically resolve
method arguments when there is just one possible argument in
scope. There's a bunch of docs for it, so I won't waste a
ton of e-ink on the details; the docs will be crystal clear
just assuming you know all about monads and monoids and
type-level programming ;) 

The punch line for us is that we provide implicits for the
basic serdes, and also for turning pairs of
serializers/deserializers into serdes, so you can avoid
explicitly passing any serdes into Streams DSL operations,
but also not have to fall back on the default key/value
serde configs. Instead, the type system will plug in the
right serde for the K/V types at each operation.

We would _not_ add an implicit for a serde that we can't
construct in a context-free way using just type information,
as in this case. That's why Yuriy dropped the new implicit
and why we're going to add an error to the existing
implicit. On the other hand, removing the existing implicit
will cause compiler errors when the type system is no longer
able to find a suitable argument for an implicit parameter,
so we don't want to just remove the existing implicit.

Thanks,
-John

On Mon, 2020-08-31 at 16:28 -0500, Leah Thomas wrote:
> Hey Sophie,
> 
> Thanks for the catch! It makes sense that the consumer would accept a
> deserializer somewhere, so we can definitely skip the additional configs. I
> updated the KIP to reflect that.
> 
> John seems to know Scala better than I do as well, but I think we need to
> keep the current implicit that allows users to just pass in a serde and no
> window size for backwards compatibility. It seems to me that based on the
> discussion around KIP-616 <https://github.com/apache/kafka/pull/8955>;, we
> can pretty easily do John's third suggestion for handling this implicit:
> logging an error message and passing to a non-deprecated constructor using
> some default value. It seems from KIP-616 that most scala users will use
> the new Serdes class anyways, and Yuriy is just removing these implicits so
> it seems like whatever fix we decide for this class won't get used too
> heavily.
> 
> Cheers,
> Leah
> 
> On Thu, Aug 27, 2020 at 8:49 PM Sophie Blee-Goldman <so...@confluent.io>
> wrote:
> 
> > Ok I'm definitely feeling pretty dumb now, but I was just thinking how
> > ridiculous
> > it is that the Consumer forces you to configure your Deserializer through
> > actual
> > config maps instead of just taking the ones you pass in directly. So I
> > thought
> > "why not just fix the Consumer to allow passing in an actual Deserializer
> > object"
> > and went to go through the code in case there's some legitimate reason why
> > not,
> > and what do you know. You actually can pass in an actual Deserializer
> > object!
> > There is a KafkaConsumer constructor that accepts a key and value
> > Deserializer,
> > and doesn't instantiate or configure a new one if provided in this way.
> > Duh.
> > 
> > Sorry for misleading everyone on that front. I'm just happy to find out
> > that a
> > reasonable way of configuring deserializer actually *is *possible after
> > all. In that
> > case, maybe we can remove the extra configs from this KIP and just proceed
> > with the deprecation?
> > 
> > Obviously that doesn't help anything with regards to the remaining question
> > that
> > John/Leah have posed. Now I probably don't have anything valuable to offer
> > there
> > since I know next to nothing about Scala, but I do want to
> > better understand: why
> > would we add an "implicit" (what exactly does this mean?) that relies on
> > allowing
> > users to not set the windowSize, if we are explicitly taking away that
> > option from
> > the Java users? Or if we have already added something, can't we just
> > deprecate
> > it like we are deprecating the Java constructor? I may need some remedial
> > lessons
> > in Scala just to understand the problem that we apparently have, because I
> > don't
> > get it.
> > 
> > By the way, I'm a little tempted to say that we should go one step further
> > and
> > deprecate the DEFAULT_WINDOWED_INNER_CLASS configs, but maybe that's
> > a bit too radical for the moment. It just seems like default serde configs
> > have been
> > a lot more trouble than they're worth overall. That said, these particular
> > configs
> > don't appear to have hurt anyone thus far, at least not that we know of
> > (possibly
> > because no one is using it anyway) so there's no strong motivation to do so
> > 
> > On Wed, Aug 26, 2020 at 9:19 AM Leah Thomas <lt...@confluent.io> wrote:
> > 
> > > Hey John,
> > > 
> > > Thanks for pointing this out, I wasn't sure how to handle the Scala
> > > changes.
> > > 
> > > I'm not fully versed in the Scala version of Streams, so feel free to
> > > correct me if any of my assumptions are wrong. I think logging an error
> > > message and then calling the constructor that requires a windowSize seems
> > > like the simplest fix from my point of view. So instead of
> > > calling`TimeWindowedSerde(final Serde<T> inner)`, we could
> > > call `TimeWindowedSerde(final Serde<T> inner, final long windowSize)`
> > with
> > > Long.MAX_VALUE as the window size.
> > > 
> > > I do feel like we would want to add an implicit to `Serdes.scala` that
> > > takes a serde and a window size so that users can access the constructor
> > > that initializes with the correct window size. I agree with your comment
> > on
> > > the KIP-616 PR that the serde needs to be pre-configured when it's
> > passed,
> > > but I'm not sure we would need a windowSize config. I think if the
> > > constructor is passed the serde and the window size, then window size
> > > should be set within the deserializer. The only catch is if the Scala
> > > version of the consumer creates a new deserializer, and at that point
> > we'd
> > > need a window size config, but I'm not sure if that's the case.
> > > 
> > > WDYT - is it possible to alter the existing implicit and add a new one?
> > > 
> > > On Wed, Aug 26, 2020 at 10:00 AM John Roesler <vv...@apache.org>
> > wrote:
> > > > Hi Leah,
> > > > 
> > > > I was just reviewing the PR for KIP-616 and realized that we
> > > > forgot to mention the Scala API in your KIP. We should
> > > > consider it because `scala.Serdes.timeWindowedSerde` is
> > > > implicitly using the exact constructor you're deprecating.
> > > > 
> > > > I had some ideas in the code review:
> > > > https://github.com/apache/kafka/pull/8955#discussion_r477358755
> > > > 
> > > > What do you think is the best approach?
> > > > 
> > > > Concretely, I think Yuriy can make the call for KIP-616 (for
> > > > the new implicit that he's adding). But I think your KIP-659
> > > > should mention how we modify the existing implicit.
> > > > 
> > > > Typically, we'd try to avoid throwing new exceptions or
> > > > causing compile errors, so
> > > > * dropping the implicit is probably off the table (compile
> > > > error).
> > > > * throwing an exception in the deserializer may not be ok,
> > > > althought it might still actually be ok since it's adding a
> > > > corruption check.
> > > > * logging an ERROR message and then passing through to the
> > > > underlying deserializer would be more conservative.
> > > > 
> > > > What do you think we should do?
> > > > 
> > > > Thanks,
> > > > -John
> > > > 
> > > > On Fri, 2020-08-21 at 16:05 -0500, Leah Thomas wrote:
> > > > > Thanks for the typo catch, John.
> > > > > 
> > > > > Let me know if anyone else has thoughts or ideas.
> > > > > 
> > > > > Cheers,
> > > > > Leah
> > > > > 
> > > > > On Fri, Aug 21, 2020 at 2:50 PM John Roesler <vv...@apache.org>
> > > > wrote:
> > > > > > Thanks, all,
> > > > > > 
> > > > > > Based on my reading of the conversation, it sounds like I
> > > > > > have some legwork to do in KIP-645, but our collective
> > > > > > instinct is that Leah's proposal doesn't need to change to
> > > > > > account for whatever we might decide to do in KIP-645.
> > > > > > 
> > > > > > I have no further concerns about KIP-645, and I think it's a
> > > > > > good proposal.
> > > > > > 
> > > > > > Thanks,
> > > > > > -John
> > > > > > 
> > > > > > P.s., there's still a typo on the wiki that says
> > > > > > "ConsumerConfig" on the code block, even though the text now
> > > > > > says "StreamsConfig".
> > > > > > 
> > > > > > 
> > > > > > On Fri, 2020-08-21 at 10:56 -0700, Sophie Blee-Goldman
> > > > > > wrote:
> > > > > > > Just want to make a quick comment on the question that John
> > raised
> > > > about
> > > > > > > whether we
> > > > > > > should introduce a separate config for "key" and "value" window
> > > > sizes:
> > > > > > > My short answer is No, I don't think that's necessary. First of
> > > all,
> > > > as
> > > > > > you
> > > > > > > said, there is no
> > > > > > > first-class concept of a "Windowed value" in the DSL. Second, to
> > > > engage
> > > > > > in
> > > > > > > your rhetorical
> > > > > > > question, if there's no default window size for a Streams program
> > > > then
> > > > > > how
> > > > > > > can there be a
> > > > > > > sensible default for the key AND a separate sensible default for
> > a
> > > > value?
> > > > > > > I don't think we need to follow the existing pattern if it
> > doesn't
> > > > make
> > > > > > > sense, and to be honest
> > > > > > > I'm a bit skeptical that anyone was even using these default
> > > windowed
> > > > > > inner
> > > > > > > classes since
> > > > > > > the config wasn't even defined/documented until quite recently.
> > I'd
> > > > > > > actually be in favor
> > > > > > > of deprecating
> > > StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> > > > > > > but I don't want to drag that into this discussion as well.
> > > > > > > 
> > > > > > > My understanding is that these were meant to mirror the default
> > > > key/value
> > > > > > > serde configs, but
> > > > > > > the real use of the DEFAULT_WINDOWED_SERDE_INNER_CLASS config is
> > > > actually
> > > > > > > that you
> > > > > > > can at least use it to configure the inner class for a Consumer,
> > > thus
> > > > > > > making the TimeWindowed
> > > > > > > serdes functional at a basic level. With the window size configs,
> > > the
> > > > > > point
> > > > > > > is not really to set a
> > > > > > > default but to make it actually work with a Consumer which
> > > > instantiates
> > > > > > the
> > > > > > > deserializer by
> > > > > > > reflection. So I don't think we should position this new config
> > as
> > > a
> > > > > > > "default" (although it may
> > > > > > > technically behave as one) -- within Streams users can and should
> > > > always
> > > > > > > supply the window
> > > > > > > size through the constructor. I don't think that's such an
> > > > inconvenience,
> > > > > > > vs the amount of
> > > > > > > confusion that will (and has) been caused by default
> > serde-related
> > > > > > configs
> > > > > > > in streams.
> > > > > > > 
> > > > > > > Regarding the fixed vs variable sized config, one idea I had was
> > to
> > > > just
> > > > > > > keep the fixed-size config
> > > > > > > and constructor and let users of enumerable windows override the
> > > > > > > TimeWindowedSerde class(es)
> > > > > > > to do whatever it is they need. IIUC you already have to override
> > > > some
> > > > > > > other windows-related
> > > > > > > classes to get variable-sized windows so doing the same for the
> > > > serdes
> > > > > > > sounds reasonable to me.
> > > > > > > Just my take on the "simple things should be easy, difficult
> > things
> > > > > > should
> > > > > > > be possible" mantra
> > > > > > > 
> > > > > > > One last quick side note: the reason we don't really need to
> > > discuss
> > > > > > > SessionWindows here
> > > > > > > is that they already encode both the start and end time for the
> > > > window.
> > > > > > > This is probably the best
> > > > > > > way to go for TimeWindows as well, but making this change in a
> > > > backwards
> > > > > > > compatible way is a
> > > > > > > much larger scope of work. And even then, we might want to
> > consider
> > > > > > making
> > > > > > > it possible to still
> > > > > > > just encode the start time to save space, thus requiring this
> > > config
> > > > > > either
> > > > > > > way
> > > > > > > 
> > > > > > > On Fri, Aug 21, 2020 at 9:26 AM Leah Thomas <
> > lthomas@confluent.io>
> > > > > > wrote:
> > > > > > > > Thanks John and Walker for your thoughts.
> > > > > > > > 
> > > > > > > > I agree with your two scenarios John, that you configure fully
> > in
> > > > the
> > > > > > > > constructor, or you don't need to call `init()`. IIUC, if we
> > pass
> > > > the
> > > > > > > > deserializer to the consumer, we want to make sure it has the
> > > > window
> > > > > > size
> > > > > > > > is set using the newly required constructor. If we don't pass
> > in
> > > > the
> > > > > > > > deserializer, the window size will be set through the configs.
> > To
> > > > > > answer
> > > > > > > > Walker's question directly, because the configs aren't passed
> > to
> > > > the
> > > > > > > > constructor, we can't set the window size unless we pass it to
> > > the
> > > > > > > > constructor or configure the constructor after initializing it.
> > > > > > > > 
> > > > > > > > For users who would rather not set a strict window size
> > (outside
> > > > of the
> > > > > > > > variable size scenario), they can pass in Long.MAX_VALUE. The
> > way
> > > > I see
> > > > > > > > this is instead of having the default be for scenarios that
> > don't
> > > > > > require a
> > > > > > > > window size, we have the default be the scenarios that *do*,
> > > > flipping
> > > > > > the
> > > > > > > > current implementation to fit with typical use cases.
> > > > > > > > 
> > > > > > > > On your points John:
> > > > > > > > 1. I agree that it makes sense to store it in StreamsConfig,
> > this
> > > > > > shouldn't
> > > > > > > > cause any issues. I've updated the KIP accordingly.
> > > > > > > > 
> > > > > > > > 2. The non-fixed time windows issue is a good point. It seems
> > > like
> > > > > > calendar
> > > > > > > > windows in particular are quite useful, so I think we want to
> > > make
> > > > sure
> > > > > > > > that this wouldn't inhibit flexible sized windows. I think
> > having
> > > > two
> > > > > > > > different configs and functions makes sense, although it is
> > > > slightly
> > > > > > > > messier. While requiring all time windows to use the
> > > WindowFunction
> > > > > > > > constructor would work, I think that allowing users to access
> > the
> > > > > > > > WindowSize constructor is preferable because it seems easier to
> > > > use for
> > > > > > > > people who are not at all interested in delving into variably
> > > sized
> > > > > > > > windows. This assumption could be wrong though, and perhaps
> > users
> > > > would
> > > > > > > > adapt quickly to the new WindowFunction style, but my immediate
> > > > > > reaction is
> > > > > > > > to support both configs and constructors.
> > > > > > > > 
> > > > > > > > One note on this is that Session Windows are handled separately
> > > > from
> > > > > > time
> > > > > > > > windows and also have variable window sizes. I assume that the
> > > > > > TimeWindowed
> > > > > > > > option is preferable for variably sized windows because you
> > still
> > > > want
> > > > > > to
> > > > > > > > access the window end times? But I think one alternative could
> > be
> > > > > > > > separating the variably sized windows from the current
> > > > implementation
> > > > > > of
> > > > > > > > time windows, although I think KIP-645
> > > > > > > > <
> > > > > > > > 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface
> > > > > > > > would make this not strictly necessary.
> > > > > > > > 
> > > > > > > > Cheers,
> > > > > > > > Leah
> > > > > > > > 
> > > > > > > > On Fri, Aug 21, 2020 at 10:04 AM John Roesler <
> > > vvcephei@apache.org
> > > > > > wrote:
> > > > > > > > > Hi Leah,
> > > > > > > > > 
> > > > > > > > > Thanks for the KIP! This has been a real pain for some use
> > > > > > > > > cases, so it's really good to see a proposal to fix it.
> > > > > > > > > 
> > > > > > > > > We do need a default constructor so that it can be
> > > > > > > > > dynamically instantiated by the consumer (or any other
> > > > > > > > > component). But I'm +1 on deprecating the constructor you're
> > > > > > > > > proposing to deprecate, which only partially configures the
> > > > > > > > > class. It seems like there are exactly two patterns: either
> > > > > > > > > you fully configure the class in the constructor and don't
> > > > > > > > > call `init()`, or you call the default constructor and then
> > > > > > > > > configure the class by calling `init()`.
> > > > > > > > > 
> > > > > > > > > I can appreciate Walker's point, but stepping back, it
> > > > > > > > > doesn't actually seem that useful to partially configure the
> > > > > > > > > class in the constructor and then finish up the
> > > > > > > > > configuration by calling `init()`. I could see the argument
> > > > > > > > > if there were a sensible default, but for this particular
> > > > > > > > > class, there isn't one. Rhetorical question: what is the
> > > > > > > > > default window size for Streams programs?
> > > > > > > > > 
> > > > > > > > > I have a couple of concerns to discuss:
> > > > > > > > > 
> > > > > > > > > 1. Config Location
> > > > > > > > > 
> > > > > > > > > I don't think I would add the new configs to ConsumerConfig,
> > > > > > > > > but would add it to StreamsConfig instead. The deserailzier
> > > > > > > > > itself is in Streams (it is
> > > > > > > > > o.a.k.streams.kstream.TimeWindowedDeserializer), so it seems
> > > > > > > > > odd to have one of its configurations in a completely
> > > > > > > > > different module.
> > > > > > > > > 
> > > > > > > > > Also, this class already has two configs, which are in
> > > > > > > > > StreamsConfig:
> > > > > > > > > StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> > > > > > > > > StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> > > > > > > > > 
> > > > > > > > > It seems like the new config belongs right next to the
> > > > > > > > > existing ones.
> > > > > > > > > 
> > > > > > > > > For me, it raises a secondary question:
> > > > > > > > > 1b: Should there be a KEY_WINDOW_SIZE and a
> > > > > > > > > VALUE_WINDOW_SIZE? I'm honestly not sure what a "windowed
> > > > > > > > > value" even is, but the fact that we can configure serdes
> > > > > > > > > for it implies that perhaps we should symmetrically
> > > > > > > > > configure its size as well.
> > > > > > > > > 
> > > > > > > > > 2. Fixed Size Assumption
> > > > > > > > > 
> > > > > > > > > In KIP-645, I'm proposing to lift the assumption that
> > > > > > > > > TimeWindows have a fixed size at all, but KIP-659 is
> > > > > > > > > currently built on that assumption.
> > > > > > > > > 
> > > > > > > > > For details on why this is not a good assumtion, see:
> > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-10408
> > > > > > > > > 
> > > > > > > > > In fact, in my POC PR for KIP-659, I'm dropping the
> > > > > > > > > constructor that takes a "window size" parameter in favor of
> > > > > > > > > one that takes a window function, mapping a window start
> > > > > > > > > time to a full Window(start, end).
> > > > > > > > > 
> > > > > > > > > In that context, it seems incongruous to introduce a
> > > > > > > > > configuration that specifies a window size. Of course, my
> > > > > > > > > KIP is also under discussion, so my proposal may not
> > > > > > > > > eventually be accepted. But it is necessary to consider both
> > > > > > > > > of these concerns together.
> > > > > > > > > 
> > > > > > > > > One option seems to be to accept both. Namely, we keep the
> > > > > > > > > "fixed size" constructor AND add my new constructor (for
> > > > > > > > > variably sized windows). Likewise, we accept your proposal,
> > > > > > > > > and KIP-659 would propose to add a new config specifying a
> > > > > > > > > windowing function, such as:
> > > > > > > > > 
> > > > > > > > > > StreamsConfig.WINDOW_FUNCTION_CONFIG
> > > > > > > > > 
> > > > > > > > > which would be an instance of:
> > > > > > > > > 
> > > > > > > > > > public interface WindowFunction implements Function<Long,
> > > > > > > > > Window>;
> > > > > > > > > 
> > > > > > > > > I'm not bringing these up for discussion in your KIP right
> > > > > > > > > now, just demonstrating the feasibility of merging both
> > > > > > > > > proposals.
> > > > > > > > > 
> > > > > > > > > My question for you: do you think the general strategy of
> > > > > > > > > having two constructors and two configs, one for fixed and
> > > > > > > > > one for variable windows, makes sense? Is it too
> > > > > > > > > complicated? Do you have a better idea?
> > > > > > > > > 
> > > > > > > > > Thanks!
> > > > > > > > > -John
> > > > > > > > > 
> > > > > > > > > On Thu, 2020-08-20 at 14:49 -0700, Walker Carlson wrote:
> > > > > > > > > > Hi Leah,
> > > > > > > > > > 
> > > > > > > > > > Could you explain a bit more why we do not wish to
> > > > > > > > > > let TimeWindowedDeserializer and WindowedSerdes be created
> > > > without
> > > > > > a
> > > > > > > > > > specified time as a parameter?
> > > > > > > > > > 
> > > > > > > > > > I understand the long.MAX_VALUE could cause problems but
> > > would
> > > > it
> > > > > > not
> > > > > > > > be
> > > > > > > > > a
> > > > > > > > > > good idea to have a usable default or fetch from the config
> > > if
> > > > > > > > available?
> > > > > > > > > > After all you are proposing to add "window.size.ms"
> > > > > > > > > > 
> > > > > > > > > > We definitely need a fix to this problem and adding "
> > > > > > window.size.ms"
> > > > > > > > > makes
> > > > > > > > > > sense to me.
> > > > > > > > > > 
> > > > > > > > > > Thanks for the KIP,
> > > > > > > > > > Walker
> > > > > > > > > > 
> > > > > > > > > > On Thu, Aug 20, 2020 at 2:22 PM Leah Thomas <
> > > > lthomas@confluent.io>
> > > > > > > > > wrote:
> > > > > > > > > > > Hi all,
> > > > > > > > > > > 
> > > > > > > > > > > I'd like to start a discussion for KIP-659:
> > > > > > > > > > > 
> > > > > > > > > > > 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
> > > > > > > > > > > The goal of the KIP is to ensure that window size is
> > passed
> > > > to
> > > > > > the
> > > > > > > > > consumer
> > > > > > > > > > > when needed, which will generally be for testing
> > purposes,
> > > > and to
> > > > > > > > avoid
> > > > > > > > > > > runtime errors when the *TimeWindowedSerde* is created
> > > > without a
> > > > > > > > window
> > > > > > > > > > > size.
> > > > > > > > > > > 
> > > > > > > > > > > Looking forward to hearing your feedback.
> > > > > > > > > > > 
> > > > > > > > > > > Cheers,
> > > > > > > > > > > Leah
> > > > > > > > > > > 


Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

Posted by Leah Thomas <lt...@confluent.io>.
Hey Sophie,

Thanks for the catch! It makes sense that the consumer would accept a
deserializer somewhere, so we can definitely skip the additional configs. I
updated the KIP to reflect that.

John seems to know Scala better than I do as well, but I think we need to
keep the current implicit that allows users to just pass in a serde and no
window size for backwards compatibility. It seems to me that based on the
discussion around KIP-616 <https://github.com/apache/kafka/pull/8955>, we
can pretty easily do John's third suggestion for handling this implicit:
logging an error message and passing to a non-deprecated constructor using
some default value. It seems from KIP-616 that most scala users will use
the new Serdes class anyways, and Yuriy is just removing these implicits so
it seems like whatever fix we decide for this class won't get used too
heavily.

Cheers,
Leah

On Thu, Aug 27, 2020 at 8:49 PM Sophie Blee-Goldman <so...@confluent.io>
wrote:

> Ok I'm definitely feeling pretty dumb now, but I was just thinking how
> ridiculous
> it is that the Consumer forces you to configure your Deserializer through
> actual
> config maps instead of just taking the ones you pass in directly. So I
> thought
> "why not just fix the Consumer to allow passing in an actual Deserializer
> object"
> and went to go through the code in case there's some legitimate reason why
> not,
> and what do you know. You actually can pass in an actual Deserializer
> object!
> There is a KafkaConsumer constructor that accepts a key and value
> Deserializer,
> and doesn't instantiate or configure a new one if provided in this way.
> Duh.
>
> Sorry for misleading everyone on that front. I'm just happy to find out
> that a
> reasonable way of configuring deserializer actually *is *possible after
> all. In that
> case, maybe we can remove the extra configs from this KIP and just proceed
> with the deprecation?
>
> Obviously that doesn't help anything with regards to the remaining question
> that
> John/Leah have posed. Now I probably don't have anything valuable to offer
> there
> since I know next to nothing about Scala, but I do want to
> better understand: why
> would we add an "implicit" (what exactly does this mean?) that relies on
> allowing
> users to not set the windowSize, if we are explicitly taking away that
> option from
> the Java users? Or if we have already added something, can't we just
> deprecate
> it like we are deprecating the Java constructor? I may need some remedial
> lessons
> in Scala just to understand the problem that we apparently have, because I
> don't
> get it.
>
> By the way, I'm a little tempted to say that we should go one step further
> and
> deprecate the DEFAULT_WINDOWED_INNER_CLASS configs, but maybe that's
> a bit too radical for the moment. It just seems like default serde configs
> have been
> a lot more trouble than they're worth overall. That said, these particular
> configs
> don't appear to have hurt anyone thus far, at least not that we know of
> (possibly
> because no one is using it anyway) so there's no strong motivation to do so
>
> On Wed, Aug 26, 2020 at 9:19 AM Leah Thomas <lt...@confluent.io> wrote:
>
> > Hey John,
> >
> > Thanks for pointing this out, I wasn't sure how to handle the Scala
> > changes.
> >
> > I'm not fully versed in the Scala version of Streams, so feel free to
> > correct me if any of my assumptions are wrong. I think logging an error
> > message and then calling the constructor that requires a windowSize seems
> > like the simplest fix from my point of view. So instead of
> > calling`TimeWindowedSerde(final Serde<T> inner)`, we could
> > call `TimeWindowedSerde(final Serde<T> inner, final long windowSize)`
> with
> > Long.MAX_VALUE as the window size.
> >
> > I do feel like we would want to add an implicit to `Serdes.scala` that
> > takes a serde and a window size so that users can access the constructor
> > that initializes with the correct window size. I agree with your comment
> on
> > the KIP-616 PR that the serde needs to be pre-configured when it's
> passed,
> > but I'm not sure we would need a windowSize config. I think if the
> > constructor is passed the serde and the window size, then window size
> > should be set within the deserializer. The only catch is if the Scala
> > version of the consumer creates a new deserializer, and at that point
> we'd
> > need a window size config, but I'm not sure if that's the case.
> >
> > WDYT - is it possible to alter the existing implicit and add a new one?
> >
> > On Wed, Aug 26, 2020 at 10:00 AM John Roesler <vv...@apache.org>
> wrote:
> >
> > > Hi Leah,
> > >
> > > I was just reviewing the PR for KIP-616 and realized that we
> > > forgot to mention the Scala API in your KIP. We should
> > > consider it because `scala.Serdes.timeWindowedSerde` is
> > > implicitly using the exact constructor you're deprecating.
> > >
> > > I had some ideas in the code review:
> > > https://github.com/apache/kafka/pull/8955#discussion_r477358755
> > >
> > > What do you think is the best approach?
> > >
> > > Concretely, I think Yuriy can make the call for KIP-616 (for
> > > the new implicit that he's adding). But I think your KIP-659
> > > should mention how we modify the existing implicit.
> > >
> > > Typically, we'd try to avoid throwing new exceptions or
> > > causing compile errors, so
> > > * dropping the implicit is probably off the table (compile
> > > error).
> > > * throwing an exception in the deserializer may not be ok,
> > > althought it might still actually be ok since it's adding a
> > > corruption check.
> > > * logging an ERROR message and then passing through to the
> > > underlying deserializer would be more conservative.
> > >
> > > What do you think we should do?
> > >
> > > Thanks,
> > > -John
> > >
> > > On Fri, 2020-08-21 at 16:05 -0500, Leah Thomas wrote:
> > > > Thanks for the typo catch, John.
> > > >
> > > > Let me know if anyone else has thoughts or ideas.
> > > >
> > > > Cheers,
> > > > Leah
> > > >
> > > > On Fri, Aug 21, 2020 at 2:50 PM John Roesler <vv...@apache.org>
> > > wrote:
> > > >
> > > > > Thanks, all,
> > > > >
> > > > > Based on my reading of the conversation, it sounds like I
> > > > > have some legwork to do in KIP-645, but our collective
> > > > > instinct is that Leah's proposal doesn't need to change to
> > > > > account for whatever we might decide to do in KIP-645.
> > > > >
> > > > > I have no further concerns about KIP-645, and I think it's a
> > > > > good proposal.
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > > > P.s., there's still a typo on the wiki that says
> > > > > "ConsumerConfig" on the code block, even though the text now
> > > > > says "StreamsConfig".
> > > > >
> > > > >
> > > > > On Fri, 2020-08-21 at 10:56 -0700, Sophie Blee-Goldman
> > > > > wrote:
> > > > > > Just want to make a quick comment on the question that John
> raised
> > > about
> > > > > > whether we
> > > > > > should introduce a separate config for "key" and "value" window
> > > sizes:
> > > > > >
> > > > > > My short answer is No, I don't think that's necessary. First of
> > all,
> > > as
> > > > > you
> > > > > > said, there is no
> > > > > > first-class concept of a "Windowed value" in the DSL. Second, to
> > > engage
> > > > > in
> > > > > > your rhetorical
> > > > > > question, if there's no default window size for a Streams program
> > > then
> > > > > how
> > > > > > can there be a
> > > > > > sensible default for the key AND a separate sensible default for
> a
> > > value?
> > > > > >
> > > > > > I don't think we need to follow the existing pattern if it
> doesn't
> > > make
> > > > > > sense, and to be honest
> > > > > > I'm a bit skeptical that anyone was even using these default
> > windowed
> > > > > inner
> > > > > > classes since
> > > > > > the config wasn't even defined/documented until quite recently.
> I'd
> > > > > > actually be in favor
> > > > > > of deprecating
> > StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> > > > > > but I don't want to drag that into this discussion as well.
> > > > > >
> > > > > > My understanding is that these were meant to mirror the default
> > > key/value
> > > > > > serde configs, but
> > > > > > the real use of the DEFAULT_WINDOWED_SERDE_INNER_CLASS config is
> > > actually
> > > > > > that you
> > > > > > can at least use it to configure the inner class for a Consumer,
> > thus
> > > > > > making the TimeWindowed
> > > > > > serdes functional at a basic level. With the window size configs,
> > the
> > > > > point
> > > > > > is not really to set a
> > > > > > default but to make it actually work with a Consumer which
> > > instantiates
> > > > > the
> > > > > > deserializer by
> > > > > > reflection. So I don't think we should position this new config
> as
> > a
> > > > > > "default" (although it may
> > > > > > technically behave as one) -- within Streams users can and should
> > > always
> > > > > > supply the window
> > > > > > size through the constructor. I don't think that's such an
> > > inconvenience,
> > > > > > vs the amount of
> > > > > > confusion that will (and has) been caused by default
> serde-related
> > > > > configs
> > > > > > in streams.
> > > > > >
> > > > > > Regarding the fixed vs variable sized config, one idea I had was
> to
> > > just
> > > > > > keep the fixed-size config
> > > > > > and constructor and let users of enumerable windows override the
> > > > > > TimeWindowedSerde class(es)
> > > > > > to do whatever it is they need. IIUC you already have to override
> > > some
> > > > > > other windows-related
> > > > > > classes to get variable-sized windows so doing the same for the
> > > serdes
> > > > > > sounds reasonable to me.
> > > > > > Just my take on the "simple things should be easy, difficult
> things
> > > > > should
> > > > > > be possible" mantra
> > > > > >
> > > > > > One last quick side note: the reason we don't really need to
> > discuss
> > > > > > SessionWindows here
> > > > > > is that they already encode both the start and end time for the
> > > window.
> > > > > > This is probably the best
> > > > > > way to go for TimeWindows as well, but making this change in a
> > > backwards
> > > > > > compatible way is a
> > > > > > much larger scope of work. And even then, we might want to
> consider
> > > > > making
> > > > > > it possible to still
> > > > > > just encode the start time to save space, thus requiring this
> > config
> > > > > either
> > > > > > way
> > > > > >
> > > > > > On Fri, Aug 21, 2020 at 9:26 AM Leah Thomas <
> lthomas@confluent.io>
> > > > > wrote:
> > > > > > > Thanks John and Walker for your thoughts.
> > > > > > >
> > > > > > > I agree with your two scenarios John, that you configure fully
> in
> > > the
> > > > > > > constructor, or you don't need to call `init()`. IIUC, if we
> pass
> > > the
> > > > > > > deserializer to the consumer, we want to make sure it has the
> > > window
> > > > > size
> > > > > > > is set using the newly required constructor. If we don't pass
> in
> > > the
> > > > > > > deserializer, the window size will be set through the configs.
> To
> > > > > answer
> > > > > > > Walker's question directly, because the configs aren't passed
> to
> > > the
> > > > > > > constructor, we can't set the window size unless we pass it to
> > the
> > > > > > > constructor or configure the constructor after initializing it.
> > > > > > >
> > > > > > > For users who would rather not set a strict window size
> (outside
> > > of the
> > > > > > > variable size scenario), they can pass in Long.MAX_VALUE. The
> way
> > > I see
> > > > > > > this is instead of having the default be for scenarios that
> don't
> > > > > require a
> > > > > > > window size, we have the default be the scenarios that *do*,
> > > flipping
> > > > > the
> > > > > > > current implementation to fit with typical use cases.
> > > > > > >
> > > > > > > On your points John:
> > > > > > > 1. I agree that it makes sense to store it in StreamsConfig,
> this
> > > > > shouldn't
> > > > > > > cause any issues. I've updated the KIP accordingly.
> > > > > > >
> > > > > > > 2. The non-fixed time windows issue is a good point. It seems
> > like
> > > > > calendar
> > > > > > > windows in particular are quite useful, so I think we want to
> > make
> > > sure
> > > > > > > that this wouldn't inhibit flexible sized windows. I think
> having
> > > two
> > > > > > > different configs and functions makes sense, although it is
> > > slightly
> > > > > > > messier. While requiring all time windows to use the
> > WindowFunction
> > > > > > > constructor would work, I think that allowing users to access
> the
> > > > > > > WindowSize constructor is preferable because it seems easier to
> > > use for
> > > > > > > people who are not at all interested in delving into variably
> > sized
> > > > > > > windows. This assumption could be wrong though, and perhaps
> users
> > > would
> > > > > > > adapt quickly to the new WindowFunction style, but my immediate
> > > > > reaction is
> > > > > > > to support both configs and constructors.
> > > > > > >
> > > > > > > One note on this is that Session Windows are handled separately
> > > from
> > > > > time
> > > > > > > windows and also have variable window sizes. I assume that the
> > > > > TimeWindowed
> > > > > > > option is preferable for variably sized windows because you
> still
> > > want
> > > > > to
> > > > > > > access the window end times? But I think one alternative could
> be
> > > > > > > separating the variably sized windows from the current
> > > implementation
> > > > > of
> > > > > > > time windows, although I think KIP-645
> > > > > > > <
> > > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface
> > > > > > > would make this not strictly necessary.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Leah
> > > > > > >
> > > > > > > On Fri, Aug 21, 2020 at 10:04 AM John Roesler <
> > vvcephei@apache.org
> > > >
> > > > > wrote:
> > > > > > > > Hi Leah,
> > > > > > > >
> > > > > > > > Thanks for the KIP! This has been a real pain for some use
> > > > > > > > cases, so it's really good to see a proposal to fix it.
> > > > > > > >
> > > > > > > > We do need a default constructor so that it can be
> > > > > > > > dynamically instantiated by the consumer (or any other
> > > > > > > > component). But I'm +1 on deprecating the constructor you're
> > > > > > > > proposing to deprecate, which only partially configures the
> > > > > > > > class. It seems like there are exactly two patterns: either
> > > > > > > > you fully configure the class in the constructor and don't
> > > > > > > > call `init()`, or you call the default constructor and then
> > > > > > > > configure the class by calling `init()`.
> > > > > > > >
> > > > > > > > I can appreciate Walker's point, but stepping back, it
> > > > > > > > doesn't actually seem that useful to partially configure the
> > > > > > > > class in the constructor and then finish up the
> > > > > > > > configuration by calling `init()`. I could see the argument
> > > > > > > > if there were a sensible default, but for this particular
> > > > > > > > class, there isn't one. Rhetorical question: what is the
> > > > > > > > default window size for Streams programs?
> > > > > > > >
> > > > > > > > I have a couple of concerns to discuss:
> > > > > > > >
> > > > > > > > 1. Config Location
> > > > > > > >
> > > > > > > > I don't think I would add the new configs to ConsumerConfig,
> > > > > > > > but would add it to StreamsConfig instead. The deserailzier
> > > > > > > > itself is in Streams (it is
> > > > > > > > o.a.k.streams.kstream.TimeWindowedDeserializer), so it seems
> > > > > > > > odd to have one of its configurations in a completely
> > > > > > > > different module.
> > > > > > > >
> > > > > > > > Also, this class already has two configs, which are in
> > > > > > > > StreamsConfig:
> > > > > > > > StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> > > > > > > > StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> > > > > > > >
> > > > > > > > It seems like the new config belongs right next to the
> > > > > > > > existing ones.
> > > > > > > >
> > > > > > > > For me, it raises a secondary question:
> > > > > > > > 1b: Should there be a KEY_WINDOW_SIZE and a
> > > > > > > > VALUE_WINDOW_SIZE? I'm honestly not sure what a "windowed
> > > > > > > > value" even is, but the fact that we can configure serdes
> > > > > > > > for it implies that perhaps we should symmetrically
> > > > > > > > configure its size as well.
> > > > > > > >
> > > > > > > > 2. Fixed Size Assumption
> > > > > > > >
> > > > > > > > In KIP-645, I'm proposing to lift the assumption that
> > > > > > > > TimeWindows have a fixed size at all, but KIP-659 is
> > > > > > > > currently built on that assumption.
> > > > > > > >
> > > > > > > > For details on why this is not a good assumtion, see:
> > > > > > > > https://issues.apache.org/jira/browse/KAFKA-10408
> > > > > > > >
> > > > > > > > In fact, in my POC PR for KIP-659, I'm dropping the
> > > > > > > > constructor that takes a "window size" parameter in favor of
> > > > > > > > one that takes a window function, mapping a window start
> > > > > > > > time to a full Window(start, end).
> > > > > > > >
> > > > > > > > In that context, it seems incongruous to introduce a
> > > > > > > > configuration that specifies a window size. Of course, my
> > > > > > > > KIP is also under discussion, so my proposal may not
> > > > > > > > eventually be accepted. But it is necessary to consider both
> > > > > > > > of these concerns together.
> > > > > > > >
> > > > > > > > One option seems to be to accept both. Namely, we keep the
> > > > > > > > "fixed size" constructor AND add my new constructor (for
> > > > > > > > variably sized windows). Likewise, we accept your proposal,
> > > > > > > > and KIP-659 would propose to add a new config specifying a
> > > > > > > > windowing function, such as:
> > > > > > > >
> > > > > > > > > StreamsConfig.WINDOW_FUNCTION_CONFIG
> > > > > > > >
> > > > > > > > which would be an instance of:
> > > > > > > >
> > > > > > > > > public interface WindowFunction implements Function<Long,
> > > > > > > > Window>;
> > > > > > > >
> > > > > > > > I'm not bringing these up for discussion in your KIP right
> > > > > > > > now, just demonstrating the feasibility of merging both
> > > > > > > > proposals.
> > > > > > > >
> > > > > > > > My question for you: do you think the general strategy of
> > > > > > > > having two constructors and two configs, one for fixed and
> > > > > > > > one for variable windows, makes sense? Is it too
> > > > > > > > complicated? Do you have a better idea?
> > > > > > > >
> > > > > > > > Thanks!
> > > > > > > > -John
> > > > > > > >
> > > > > > > > On Thu, 2020-08-20 at 14:49 -0700, Walker Carlson wrote:
> > > > > > > > > Hi Leah,
> > > > > > > > >
> > > > > > > > > Could you explain a bit more why we do not wish to
> > > > > > > > > let TimeWindowedDeserializer and WindowedSerdes be created
> > > without
> > > > > a
> > > > > > > > > specified time as a parameter?
> > > > > > > > >
> > > > > > > > > I understand the long.MAX_VALUE could cause problems but
> > would
> > > it
> > > > > not
> > > > > > > be
> > > > > > > > a
> > > > > > > > > good idea to have a usable default or fetch from the config
> > if
> > > > > > > available?
> > > > > > > > > After all you are proposing to add "window.size.ms"
> > > > > > > > >
> > > > > > > > > We definitely need a fix to this problem and adding "
> > > > > window.size.ms"
> > > > > > > > makes
> > > > > > > > > sense to me.
> > > > > > > > >
> > > > > > > > > Thanks for the KIP,
> > > > > > > > > Walker
> > > > > > > > >
> > > > > > > > > On Thu, Aug 20, 2020 at 2:22 PM Leah Thomas <
> > > lthomas@confluent.io>
> > > > > > > > wrote:
> > > > > > > > > > Hi all,
> > > > > > > > > >
> > > > > > > > > > I'd like to start a discussion for KIP-659:
> > > > > > > > > >
> > > > > > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
> > > > > > > > > > The goal of the KIP is to ensure that window size is
> passed
> > > to
> > > > > the
> > > > > > > > consumer
> > > > > > > > > > when needed, which will generally be for testing
> purposes,
> > > and to
> > > > > > > avoid
> > > > > > > > > > runtime errors when the *TimeWindowedSerde* is created
> > > without a
> > > > > > > window
> > > > > > > > > > size.
> > > > > > > > > >
> > > > > > > > > > Looking forward to hearing your feedback.
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > > Leah
> > > > > > > > > >
> > >
> > >
> >
>

Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

Posted by Sophie Blee-Goldman <so...@confluent.io>.
Ok I'm definitely feeling pretty dumb now, but I was just thinking how
ridiculous
it is that the Consumer forces you to configure your Deserializer through
actual
config maps instead of just taking the ones you pass in directly. So I
thought
"why not just fix the Consumer to allow passing in an actual Deserializer
object"
and went to go through the code in case there's some legitimate reason why
not,
and what do you know. You actually can pass in an actual Deserializer
object!
There is a KafkaConsumer constructor that accepts a key and value
Deserializer,
and doesn't instantiate or configure a new one if provided in this way. Duh.

Sorry for misleading everyone on that front. I'm just happy to find out
that a
reasonable way of configuring deserializer actually *is *possible after
all. In that
case, maybe we can remove the extra configs from this KIP and just proceed
with the deprecation?

Obviously that doesn't help anything with regards to the remaining question
that
John/Leah have posed. Now I probably don't have anything valuable to offer
there
since I know next to nothing about Scala, but I do want to
better understand: why
would we add an "implicit" (what exactly does this mean?) that relies on
allowing
users to not set the windowSize, if we are explicitly taking away that
option from
the Java users? Or if we have already added something, can't we just
deprecate
it like we are deprecating the Java constructor? I may need some remedial
lessons
in Scala just to understand the problem that we apparently have, because I
don't
get it.

By the way, I'm a little tempted to say that we should go one step further
and
deprecate the DEFAULT_WINDOWED_INNER_CLASS configs, but maybe that's
a bit too radical for the moment. It just seems like default serde configs
have been
a lot more trouble than they're worth overall. That said, these particular
configs
don't appear to have hurt anyone thus far, at least not that we know of
(possibly
because no one is using it anyway) so there's no strong motivation to do so

On Wed, Aug 26, 2020 at 9:19 AM Leah Thomas <lt...@confluent.io> wrote:

> Hey John,
>
> Thanks for pointing this out, I wasn't sure how to handle the Scala
> changes.
>
> I'm not fully versed in the Scala version of Streams, so feel free to
> correct me if any of my assumptions are wrong. I think logging an error
> message and then calling the constructor that requires a windowSize seems
> like the simplest fix from my point of view. So instead of
> calling`TimeWindowedSerde(final Serde<T> inner)`, we could
> call `TimeWindowedSerde(final Serde<T> inner, final long windowSize)` with
> Long.MAX_VALUE as the window size.
>
> I do feel like we would want to add an implicit to `Serdes.scala` that
> takes a serde and a window size so that users can access the constructor
> that initializes with the correct window size. I agree with your comment on
> the KIP-616 PR that the serde needs to be pre-configured when it's passed,
> but I'm not sure we would need a windowSize config. I think if the
> constructor is passed the serde and the window size, then window size
> should be set within the deserializer. The only catch is if the Scala
> version of the consumer creates a new deserializer, and at that point we'd
> need a window size config, but I'm not sure if that's the case.
>
> WDYT - is it possible to alter the existing implicit and add a new one?
>
> On Wed, Aug 26, 2020 at 10:00 AM John Roesler <vv...@apache.org> wrote:
>
> > Hi Leah,
> >
> > I was just reviewing the PR for KIP-616 and realized that we
> > forgot to mention the Scala API in your KIP. We should
> > consider it because `scala.Serdes.timeWindowedSerde` is
> > implicitly using the exact constructor you're deprecating.
> >
> > I had some ideas in the code review:
> > https://github.com/apache/kafka/pull/8955#discussion_r477358755
> >
> > What do you think is the best approach?
> >
> > Concretely, I think Yuriy can make the call for KIP-616 (for
> > the new implicit that he's adding). But I think your KIP-659
> > should mention how we modify the existing implicit.
> >
> > Typically, we'd try to avoid throwing new exceptions or
> > causing compile errors, so
> > * dropping the implicit is probably off the table (compile
> > error).
> > * throwing an exception in the deserializer may not be ok,
> > althought it might still actually be ok since it's adding a
> > corruption check.
> > * logging an ERROR message and then passing through to the
> > underlying deserializer would be more conservative.
> >
> > What do you think we should do?
> >
> > Thanks,
> > -John
> >
> > On Fri, 2020-08-21 at 16:05 -0500, Leah Thomas wrote:
> > > Thanks for the typo catch, John.
> > >
> > > Let me know if anyone else has thoughts or ideas.
> > >
> > > Cheers,
> > > Leah
> > >
> > > On Fri, Aug 21, 2020 at 2:50 PM John Roesler <vv...@apache.org>
> > wrote:
> > >
> > > > Thanks, all,
> > > >
> > > > Based on my reading of the conversation, it sounds like I
> > > > have some legwork to do in KIP-645, but our collective
> > > > instinct is that Leah's proposal doesn't need to change to
> > > > account for whatever we might decide to do in KIP-645.
> > > >
> > > > I have no further concerns about KIP-645, and I think it's a
> > > > good proposal.
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > P.s., there's still a typo on the wiki that says
> > > > "ConsumerConfig" on the code block, even though the text now
> > > > says "StreamsConfig".
> > > >
> > > >
> > > > On Fri, 2020-08-21 at 10:56 -0700, Sophie Blee-Goldman
> > > > wrote:
> > > > > Just want to make a quick comment on the question that John raised
> > about
> > > > > whether we
> > > > > should introduce a separate config for "key" and "value" window
> > sizes:
> > > > >
> > > > > My short answer is No, I don't think that's necessary. First of
> all,
> > as
> > > > you
> > > > > said, there is no
> > > > > first-class concept of a "Windowed value" in the DSL. Second, to
> > engage
> > > > in
> > > > > your rhetorical
> > > > > question, if there's no default window size for a Streams program
> > then
> > > > how
> > > > > can there be a
> > > > > sensible default for the key AND a separate sensible default for a
> > value?
> > > > >
> > > > > I don't think we need to follow the existing pattern if it doesn't
> > make
> > > > > sense, and to be honest
> > > > > I'm a bit skeptical that anyone was even using these default
> windowed
> > > > inner
> > > > > classes since
> > > > > the config wasn't even defined/documented until quite recently. I'd
> > > > > actually be in favor
> > > > > of deprecating
> StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> > > > > but I don't want to drag that into this discussion as well.
> > > > >
> > > > > My understanding is that these were meant to mirror the default
> > key/value
> > > > > serde configs, but
> > > > > the real use of the DEFAULT_WINDOWED_SERDE_INNER_CLASS config is
> > actually
> > > > > that you
> > > > > can at least use it to configure the inner class for a Consumer,
> thus
> > > > > making the TimeWindowed
> > > > > serdes functional at a basic level. With the window size configs,
> the
> > > > point
> > > > > is not really to set a
> > > > > default but to make it actually work with a Consumer which
> > instantiates
> > > > the
> > > > > deserializer by
> > > > > reflection. So I don't think we should position this new config as
> a
> > > > > "default" (although it may
> > > > > technically behave as one) -- within Streams users can and should
> > always
> > > > > supply the window
> > > > > size through the constructor. I don't think that's such an
> > inconvenience,
> > > > > vs the amount of
> > > > > confusion that will (and has) been caused by default serde-related
> > > > configs
> > > > > in streams.
> > > > >
> > > > > Regarding the fixed vs variable sized config, one idea I had was to
> > just
> > > > > keep the fixed-size config
> > > > > and constructor and let users of enumerable windows override the
> > > > > TimeWindowedSerde class(es)
> > > > > to do whatever it is they need. IIUC you already have to override
> > some
> > > > > other windows-related
> > > > > classes to get variable-sized windows so doing the same for the
> > serdes
> > > > > sounds reasonable to me.
> > > > > Just my take on the "simple things should be easy, difficult things
> > > > should
> > > > > be possible" mantra
> > > > >
> > > > > One last quick side note: the reason we don't really need to
> discuss
> > > > > SessionWindows here
> > > > > is that they already encode both the start and end time for the
> > window.
> > > > > This is probably the best
> > > > > way to go for TimeWindows as well, but making this change in a
> > backwards
> > > > > compatible way is a
> > > > > much larger scope of work. And even then, we might want to consider
> > > > making
> > > > > it possible to still
> > > > > just encode the start time to save space, thus requiring this
> config
> > > > either
> > > > > way
> > > > >
> > > > > On Fri, Aug 21, 2020 at 9:26 AM Leah Thomas <lt...@confluent.io>
> > > > wrote:
> > > > > > Thanks John and Walker for your thoughts.
> > > > > >
> > > > > > I agree with your two scenarios John, that you configure fully in
> > the
> > > > > > constructor, or you don't need to call `init()`. IIUC, if we pass
> > the
> > > > > > deserializer to the consumer, we want to make sure it has the
> > window
> > > > size
> > > > > > is set using the newly required constructor. If we don't pass in
> > the
> > > > > > deserializer, the window size will be set through the configs. To
> > > > answer
> > > > > > Walker's question directly, because the configs aren't passed to
> > the
> > > > > > constructor, we can't set the window size unless we pass it to
> the
> > > > > > constructor or configure the constructor after initializing it.
> > > > > >
> > > > > > For users who would rather not set a strict window size (outside
> > of the
> > > > > > variable size scenario), they can pass in Long.MAX_VALUE. The way
> > I see
> > > > > > this is instead of having the default be for scenarios that don't
> > > > require a
> > > > > > window size, we have the default be the scenarios that *do*,
> > flipping
> > > > the
> > > > > > current implementation to fit with typical use cases.
> > > > > >
> > > > > > On your points John:
> > > > > > 1. I agree that it makes sense to store it in StreamsConfig, this
> > > > shouldn't
> > > > > > cause any issues. I've updated the KIP accordingly.
> > > > > >
> > > > > > 2. The non-fixed time windows issue is a good point. It seems
> like
> > > > calendar
> > > > > > windows in particular are quite useful, so I think we want to
> make
> > sure
> > > > > > that this wouldn't inhibit flexible sized windows. I think having
> > two
> > > > > > different configs and functions makes sense, although it is
> > slightly
> > > > > > messier. While requiring all time windows to use the
> WindowFunction
> > > > > > constructor would work, I think that allowing users to access the
> > > > > > WindowSize constructor is preferable because it seems easier to
> > use for
> > > > > > people who are not at all interested in delving into variably
> sized
> > > > > > windows. This assumption could be wrong though, and perhaps users
> > would
> > > > > > adapt quickly to the new WindowFunction style, but my immediate
> > > > reaction is
> > > > > > to support both configs and constructors.
> > > > > >
> > > > > > One note on this is that Session Windows are handled separately
> > from
> > > > time
> > > > > > windows and also have variable window sizes. I assume that the
> > > > TimeWindowed
> > > > > > option is preferable for variably sized windows because you still
> > want
> > > > to
> > > > > > access the window end times? But I think one alternative could be
> > > > > > separating the variably sized windows from the current
> > implementation
> > > > of
> > > > > > time windows, although I think KIP-645
> > > > > > <
> > > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface
> > > > > > would make this not strictly necessary.
> > > > > >
> > > > > > Cheers,
> > > > > > Leah
> > > > > >
> > > > > > On Fri, Aug 21, 2020 at 10:04 AM John Roesler <
> vvcephei@apache.org
> > >
> > > > wrote:
> > > > > > > Hi Leah,
> > > > > > >
> > > > > > > Thanks for the KIP! This has been a real pain for some use
> > > > > > > cases, so it's really good to see a proposal to fix it.
> > > > > > >
> > > > > > > We do need a default constructor so that it can be
> > > > > > > dynamically instantiated by the consumer (or any other
> > > > > > > component). But I'm +1 on deprecating the constructor you're
> > > > > > > proposing to deprecate, which only partially configures the
> > > > > > > class. It seems like there are exactly two patterns: either
> > > > > > > you fully configure the class in the constructor and don't
> > > > > > > call `init()`, or you call the default constructor and then
> > > > > > > configure the class by calling `init()`.
> > > > > > >
> > > > > > > I can appreciate Walker's point, but stepping back, it
> > > > > > > doesn't actually seem that useful to partially configure the
> > > > > > > class in the constructor and then finish up the
> > > > > > > configuration by calling `init()`. I could see the argument
> > > > > > > if there were a sensible default, but for this particular
> > > > > > > class, there isn't one. Rhetorical question: what is the
> > > > > > > default window size for Streams programs?
> > > > > > >
> > > > > > > I have a couple of concerns to discuss:
> > > > > > >
> > > > > > > 1. Config Location
> > > > > > >
> > > > > > > I don't think I would add the new configs to ConsumerConfig,
> > > > > > > but would add it to StreamsConfig instead. The deserailzier
> > > > > > > itself is in Streams (it is
> > > > > > > o.a.k.streams.kstream.TimeWindowedDeserializer), so it seems
> > > > > > > odd to have one of its configurations in a completely
> > > > > > > different module.
> > > > > > >
> > > > > > > Also, this class already has two configs, which are in
> > > > > > > StreamsConfig:
> > > > > > > StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> > > > > > > StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> > > > > > >
> > > > > > > It seems like the new config belongs right next to the
> > > > > > > existing ones.
> > > > > > >
> > > > > > > For me, it raises a secondary question:
> > > > > > > 1b: Should there be a KEY_WINDOW_SIZE and a
> > > > > > > VALUE_WINDOW_SIZE? I'm honestly not sure what a "windowed
> > > > > > > value" even is, but the fact that we can configure serdes
> > > > > > > for it implies that perhaps we should symmetrically
> > > > > > > configure its size as well.
> > > > > > >
> > > > > > > 2. Fixed Size Assumption
> > > > > > >
> > > > > > > In KIP-645, I'm proposing to lift the assumption that
> > > > > > > TimeWindows have a fixed size at all, but KIP-659 is
> > > > > > > currently built on that assumption.
> > > > > > >
> > > > > > > For details on why this is not a good assumtion, see:
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-10408
> > > > > > >
> > > > > > > In fact, in my POC PR for KIP-659, I'm dropping the
> > > > > > > constructor that takes a "window size" parameter in favor of
> > > > > > > one that takes a window function, mapping a window start
> > > > > > > time to a full Window(start, end).
> > > > > > >
> > > > > > > In that context, it seems incongruous to introduce a
> > > > > > > configuration that specifies a window size. Of course, my
> > > > > > > KIP is also under discussion, so my proposal may not
> > > > > > > eventually be accepted. But it is necessary to consider both
> > > > > > > of these concerns together.
> > > > > > >
> > > > > > > One option seems to be to accept both. Namely, we keep the
> > > > > > > "fixed size" constructor AND add my new constructor (for
> > > > > > > variably sized windows). Likewise, we accept your proposal,
> > > > > > > and KIP-659 would propose to add a new config specifying a
> > > > > > > windowing function, such as:
> > > > > > >
> > > > > > > > StreamsConfig.WINDOW_FUNCTION_CONFIG
> > > > > > >
> > > > > > > which would be an instance of:
> > > > > > >
> > > > > > > > public interface WindowFunction implements Function<Long,
> > > > > > > Window>;
> > > > > > >
> > > > > > > I'm not bringing these up for discussion in your KIP right
> > > > > > > now, just demonstrating the feasibility of merging both
> > > > > > > proposals.
> > > > > > >
> > > > > > > My question for you: do you think the general strategy of
> > > > > > > having two constructors and two configs, one for fixed and
> > > > > > > one for variable windows, makes sense? Is it too
> > > > > > > complicated? Do you have a better idea?
> > > > > > >
> > > > > > > Thanks!
> > > > > > > -John
> > > > > > >
> > > > > > > On Thu, 2020-08-20 at 14:49 -0700, Walker Carlson wrote:
> > > > > > > > Hi Leah,
> > > > > > > >
> > > > > > > > Could you explain a bit more why we do not wish to
> > > > > > > > let TimeWindowedDeserializer and WindowedSerdes be created
> > without
> > > > a
> > > > > > > > specified time as a parameter?
> > > > > > > >
> > > > > > > > I understand the long.MAX_VALUE could cause problems but
> would
> > it
> > > > not
> > > > > > be
> > > > > > > a
> > > > > > > > good idea to have a usable default or fetch from the config
> if
> > > > > > available?
> > > > > > > > After all you are proposing to add "window.size.ms"
> > > > > > > >
> > > > > > > > We definitely need a fix to this problem and adding "
> > > > window.size.ms"
> > > > > > > makes
> > > > > > > > sense to me.
> > > > > > > >
> > > > > > > > Thanks for the KIP,
> > > > > > > > Walker
> > > > > > > >
> > > > > > > > On Thu, Aug 20, 2020 at 2:22 PM Leah Thomas <
> > lthomas@confluent.io>
> > > > > > > wrote:
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > > I'd like to start a discussion for KIP-659:
> > > > > > > > >
> > > > > > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
> > > > > > > > > The goal of the KIP is to ensure that window size is passed
> > to
> > > > the
> > > > > > > consumer
> > > > > > > > > when needed, which will generally be for testing purposes,
> > and to
> > > > > > avoid
> > > > > > > > > runtime errors when the *TimeWindowedSerde* is created
> > without a
> > > > > > window
> > > > > > > > > size.
> > > > > > > > >
> > > > > > > > > Looking forward to hearing your feedback.
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > > Leah
> > > > > > > > >
> >
> >
>

Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

Posted by Leah Thomas <lt...@confluent.io>.
Hey John,

Thanks for pointing this out, I wasn't sure how to handle the Scala changes.

I'm not fully versed in the Scala version of Streams, so feel free to
correct me if any of my assumptions are wrong. I think logging an error
message and then calling the constructor that requires a windowSize seems
like the simplest fix from my point of view. So instead of
calling`TimeWindowedSerde(final Serde<T> inner)`, we could
call `TimeWindowedSerde(final Serde<T> inner, final long windowSize)` with
Long.MAX_VALUE as the window size.

I do feel like we would want to add an implicit to `Serdes.scala` that
takes a serde and a window size so that users can access the constructor
that initializes with the correct window size. I agree with your comment on
the KIP-616 PR that the serde needs to be pre-configured when it's passed,
but I'm not sure we would need a windowSize config. I think if the
constructor is passed the serde and the window size, then window size
should be set within the deserializer. The only catch is if the Scala
version of the consumer creates a new deserializer, and at that point we'd
need a window size config, but I'm not sure if that's the case.

WDYT - is it possible to alter the existing implicit and add a new one?

On Wed, Aug 26, 2020 at 10:00 AM John Roesler <vv...@apache.org> wrote:

> Hi Leah,
>
> I was just reviewing the PR for KIP-616 and realized that we
> forgot to mention the Scala API in your KIP. We should
> consider it because `scala.Serdes.timeWindowedSerde` is
> implicitly using the exact constructor you're deprecating.
>
> I had some ideas in the code review:
> https://github.com/apache/kafka/pull/8955#discussion_r477358755
>
> What do you think is the best approach?
>
> Concretely, I think Yuriy can make the call for KIP-616 (for
> the new implicit that he's adding). But I think your KIP-659
> should mention how we modify the existing implicit.
>
> Typically, we'd try to avoid throwing new exceptions or
> causing compile errors, so
> * dropping the implicit is probably off the table (compile
> error).
> * throwing an exception in the deserializer may not be ok,
> althought it might still actually be ok since it's adding a
> corruption check.
> * logging an ERROR message and then passing through to the
> underlying deserializer would be more conservative.
>
> What do you think we should do?
>
> Thanks,
> -John
>
> On Fri, 2020-08-21 at 16:05 -0500, Leah Thomas wrote:
> > Thanks for the typo catch, John.
> >
> > Let me know if anyone else has thoughts or ideas.
> >
> > Cheers,
> > Leah
> >
> > On Fri, Aug 21, 2020 at 2:50 PM John Roesler <vv...@apache.org>
> wrote:
> >
> > > Thanks, all,
> > >
> > > Based on my reading of the conversation, it sounds like I
> > > have some legwork to do in KIP-645, but our collective
> > > instinct is that Leah's proposal doesn't need to change to
> > > account for whatever we might decide to do in KIP-645.
> > >
> > > I have no further concerns about KIP-645, and I think it's a
> > > good proposal.
> > >
> > > Thanks,
> > > -John
> > >
> > > P.s., there's still a typo on the wiki that says
> > > "ConsumerConfig" on the code block, even though the text now
> > > says "StreamsConfig".
> > >
> > >
> > > On Fri, 2020-08-21 at 10:56 -0700, Sophie Blee-Goldman
> > > wrote:
> > > > Just want to make a quick comment on the question that John raised
> about
> > > > whether we
> > > > should introduce a separate config for "key" and "value" window
> sizes:
> > > >
> > > > My short answer is No, I don't think that's necessary. First of all,
> as
> > > you
> > > > said, there is no
> > > > first-class concept of a "Windowed value" in the DSL. Second, to
> engage
> > > in
> > > > your rhetorical
> > > > question, if there's no default window size for a Streams program
> then
> > > how
> > > > can there be a
> > > > sensible default for the key AND a separate sensible default for a
> value?
> > > >
> > > > I don't think we need to follow the existing pattern if it doesn't
> make
> > > > sense, and to be honest
> > > > I'm a bit skeptical that anyone was even using these default windowed
> > > inner
> > > > classes since
> > > > the config wasn't even defined/documented until quite recently. I'd
> > > > actually be in favor
> > > > of deprecating StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> > > > but I don't want to drag that into this discussion as well.
> > > >
> > > > My understanding is that these were meant to mirror the default
> key/value
> > > > serde configs, but
> > > > the real use of the DEFAULT_WINDOWED_SERDE_INNER_CLASS config is
> actually
> > > > that you
> > > > can at least use it to configure the inner class for a Consumer, thus
> > > > making the TimeWindowed
> > > > serdes functional at a basic level. With the window size configs, the
> > > point
> > > > is not really to set a
> > > > default but to make it actually work with a Consumer which
> instantiates
> > > the
> > > > deserializer by
> > > > reflection. So I don't think we should position this new config as a
> > > > "default" (although it may
> > > > technically behave as one) -- within Streams users can and should
> always
> > > > supply the window
> > > > size through the constructor. I don't think that's such an
> inconvenience,
> > > > vs the amount of
> > > > confusion that will (and has) been caused by default serde-related
> > > configs
> > > > in streams.
> > > >
> > > > Regarding the fixed vs variable sized config, one idea I had was to
> just
> > > > keep the fixed-size config
> > > > and constructor and let users of enumerable windows override the
> > > > TimeWindowedSerde class(es)
> > > > to do whatever it is they need. IIUC you already have to override
> some
> > > > other windows-related
> > > > classes to get variable-sized windows so doing the same for the
> serdes
> > > > sounds reasonable to me.
> > > > Just my take on the "simple things should be easy, difficult things
> > > should
> > > > be possible" mantra
> > > >
> > > > One last quick side note: the reason we don't really need to discuss
> > > > SessionWindows here
> > > > is that they already encode both the start and end time for the
> window.
> > > > This is probably the best
> > > > way to go for TimeWindows as well, but making this change in a
> backwards
> > > > compatible way is a
> > > > much larger scope of work. And even then, we might want to consider
> > > making
> > > > it possible to still
> > > > just encode the start time to save space, thus requiring this config
> > > either
> > > > way
> > > >
> > > > On Fri, Aug 21, 2020 at 9:26 AM Leah Thomas <lt...@confluent.io>
> > > wrote:
> > > > > Thanks John and Walker for your thoughts.
> > > > >
> > > > > I agree with your two scenarios John, that you configure fully in
> the
> > > > > constructor, or you don't need to call `init()`. IIUC, if we pass
> the
> > > > > deserializer to the consumer, we want to make sure it has the
> window
> > > size
> > > > > is set using the newly required constructor. If we don't pass in
> the
> > > > > deserializer, the window size will be set through the configs. To
> > > answer
> > > > > Walker's question directly, because the configs aren't passed to
> the
> > > > > constructor, we can't set the window size unless we pass it to the
> > > > > constructor or configure the constructor after initializing it.
> > > > >
> > > > > For users who would rather not set a strict window size (outside
> of the
> > > > > variable size scenario), they can pass in Long.MAX_VALUE. The way
> I see
> > > > > this is instead of having the default be for scenarios that don't
> > > require a
> > > > > window size, we have the default be the scenarios that *do*,
> flipping
> > > the
> > > > > current implementation to fit with typical use cases.
> > > > >
> > > > > On your points John:
> > > > > 1. I agree that it makes sense to store it in StreamsConfig, this
> > > shouldn't
> > > > > cause any issues. I've updated the KIP accordingly.
> > > > >
> > > > > 2. The non-fixed time windows issue is a good point. It seems like
> > > calendar
> > > > > windows in particular are quite useful, so I think we want to make
> sure
> > > > > that this wouldn't inhibit flexible sized windows. I think having
> two
> > > > > different configs and functions makes sense, although it is
> slightly
> > > > > messier. While requiring all time windows to use the WindowFunction
> > > > > constructor would work, I think that allowing users to access the
> > > > > WindowSize constructor is preferable because it seems easier to
> use for
> > > > > people who are not at all interested in delving into variably sized
> > > > > windows. This assumption could be wrong though, and perhaps users
> would
> > > > > adapt quickly to the new WindowFunction style, but my immediate
> > > reaction is
> > > > > to support both configs and constructors.
> > > > >
> > > > > One note on this is that Session Windows are handled separately
> from
> > > time
> > > > > windows and also have variable window sizes. I assume that the
> > > TimeWindowed
> > > > > option is preferable for variably sized windows because you still
> want
> > > to
> > > > > access the window end times? But I think one alternative could be
> > > > > separating the variably sized windows from the current
> implementation
> > > of
> > > > > time windows, although I think KIP-645
> > > > > <
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface
> > > > > would make this not strictly necessary.
> > > > >
> > > > > Cheers,
> > > > > Leah
> > > > >
> > > > > On Fri, Aug 21, 2020 at 10:04 AM John Roesler <vvcephei@apache.org
> >
> > > wrote:
> > > > > > Hi Leah,
> > > > > >
> > > > > > Thanks for the KIP! This has been a real pain for some use
> > > > > > cases, so it's really good to see a proposal to fix it.
> > > > > >
> > > > > > We do need a default constructor so that it can be
> > > > > > dynamically instantiated by the consumer (or any other
> > > > > > component). But I'm +1 on deprecating the constructor you're
> > > > > > proposing to deprecate, which only partially configures the
> > > > > > class. It seems like there are exactly two patterns: either
> > > > > > you fully configure the class in the constructor and don't
> > > > > > call `init()`, or you call the default constructor and then
> > > > > > configure the class by calling `init()`.
> > > > > >
> > > > > > I can appreciate Walker's point, but stepping back, it
> > > > > > doesn't actually seem that useful to partially configure the
> > > > > > class in the constructor and then finish up the
> > > > > > configuration by calling `init()`. I could see the argument
> > > > > > if there were a sensible default, but for this particular
> > > > > > class, there isn't one. Rhetorical question: what is the
> > > > > > default window size for Streams programs?
> > > > > >
> > > > > > I have a couple of concerns to discuss:
> > > > > >
> > > > > > 1. Config Location
> > > > > >
> > > > > > I don't think I would add the new configs to ConsumerConfig,
> > > > > > but would add it to StreamsConfig instead. The deserailzier
> > > > > > itself is in Streams (it is
> > > > > > o.a.k.streams.kstream.TimeWindowedDeserializer), so it seems
> > > > > > odd to have one of its configurations in a completely
> > > > > > different module.
> > > > > >
> > > > > > Also, this class already has two configs, which are in
> > > > > > StreamsConfig:
> > > > > > StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> > > > > > StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> > > > > >
> > > > > > It seems like the new config belongs right next to the
> > > > > > existing ones.
> > > > > >
> > > > > > For me, it raises a secondary question:
> > > > > > 1b: Should there be a KEY_WINDOW_SIZE and a
> > > > > > VALUE_WINDOW_SIZE? I'm honestly not sure what a "windowed
> > > > > > value" even is, but the fact that we can configure serdes
> > > > > > for it implies that perhaps we should symmetrically
> > > > > > configure its size as well.
> > > > > >
> > > > > > 2. Fixed Size Assumption
> > > > > >
> > > > > > In KIP-645, I'm proposing to lift the assumption that
> > > > > > TimeWindows have a fixed size at all, but KIP-659 is
> > > > > > currently built on that assumption.
> > > > > >
> > > > > > For details on why this is not a good assumtion, see:
> > > > > > https://issues.apache.org/jira/browse/KAFKA-10408
> > > > > >
> > > > > > In fact, in my POC PR for KIP-659, I'm dropping the
> > > > > > constructor that takes a "window size" parameter in favor of
> > > > > > one that takes a window function, mapping a window start
> > > > > > time to a full Window(start, end).
> > > > > >
> > > > > > In that context, it seems incongruous to introduce a
> > > > > > configuration that specifies a window size. Of course, my
> > > > > > KIP is also under discussion, so my proposal may not
> > > > > > eventually be accepted. But it is necessary to consider both
> > > > > > of these concerns together.
> > > > > >
> > > > > > One option seems to be to accept both. Namely, we keep the
> > > > > > "fixed size" constructor AND add my new constructor (for
> > > > > > variably sized windows). Likewise, we accept your proposal,
> > > > > > and KIP-659 would propose to add a new config specifying a
> > > > > > windowing function, such as:
> > > > > >
> > > > > > > StreamsConfig.WINDOW_FUNCTION_CONFIG
> > > > > >
> > > > > > which would be an instance of:
> > > > > >
> > > > > > > public interface WindowFunction implements Function<Long,
> > > > > > Window>;
> > > > > >
> > > > > > I'm not bringing these up for discussion in your KIP right
> > > > > > now, just demonstrating the feasibility of merging both
> > > > > > proposals.
> > > > > >
> > > > > > My question for you: do you think the general strategy of
> > > > > > having two constructors and two configs, one for fixed and
> > > > > > one for variable windows, makes sense? Is it too
> > > > > > complicated? Do you have a better idea?
> > > > > >
> > > > > > Thanks!
> > > > > > -John
> > > > > >
> > > > > > On Thu, 2020-08-20 at 14:49 -0700, Walker Carlson wrote:
> > > > > > > Hi Leah,
> > > > > > >
> > > > > > > Could you explain a bit more why we do not wish to
> > > > > > > let TimeWindowedDeserializer and WindowedSerdes be created
> without
> > > a
> > > > > > > specified time as a parameter?
> > > > > > >
> > > > > > > I understand the long.MAX_VALUE could cause problems but would
> it
> > > not
> > > > > be
> > > > > > a
> > > > > > > good idea to have a usable default or fetch from the config if
> > > > > available?
> > > > > > > After all you are proposing to add "window.size.ms"
> > > > > > >
> > > > > > > We definitely need a fix to this problem and adding "
> > > window.size.ms"
> > > > > > makes
> > > > > > > sense to me.
> > > > > > >
> > > > > > > Thanks for the KIP,
> > > > > > > Walker
> > > > > > >
> > > > > > > On Thu, Aug 20, 2020 at 2:22 PM Leah Thomas <
> lthomas@confluent.io>
> > > > > > wrote:
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > I'd like to start a discussion for KIP-659:
> > > > > > > >
> > > > > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
> > > > > > > > The goal of the KIP is to ensure that window size is passed
> to
> > > the
> > > > > > consumer
> > > > > > > > when needed, which will generally be for testing purposes,
> and to
> > > > > avoid
> > > > > > > > runtime errors when the *TimeWindowedSerde* is created
> without a
> > > > > window
> > > > > > > > size.
> > > > > > > >
> > > > > > > > Looking forward to hearing your feedback.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Leah
> > > > > > > >
>
>

Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

Posted by John Roesler <vv...@apache.org>.
Hi Leah,

I was just reviewing the PR for KIP-616 and realized that we
forgot to mention the Scala API in your KIP. We should
consider it because `scala.Serdes.timeWindowedSerde` is
implicitly using the exact constructor you're deprecating.

I had some ideas in the code review: 
https://github.com/apache/kafka/pull/8955#discussion_r477358755

What do you think is the best approach?

Concretely, I think Yuriy can make the call for KIP-616 (for
the new implicit that he's adding). But I think your KIP-659 
should mention how we modify the existing implicit.

Typically, we'd try to avoid throwing new exceptions or
causing compile errors, so
* dropping the implicit is probably off the table (compile
error).
* throwing an exception in the deserializer may not be ok,
althought it might still actually be ok since it's adding a
corruption check.
* logging an ERROR message and then passing through to the
underlying deserializer would be more conservative.

What do you think we should do?

Thanks,
-John

On Fri, 2020-08-21 at 16:05 -0500, Leah Thomas wrote:
> Thanks for the typo catch, John.
> 
> Let me know if anyone else has thoughts or ideas.
> 
> Cheers,
> Leah
> 
> On Fri, Aug 21, 2020 at 2:50 PM John Roesler <vv...@apache.org> wrote:
> 
> > Thanks, all,
> > 
> > Based on my reading of the conversation, it sounds like I
> > have some legwork to do in KIP-645, but our collective
> > instinct is that Leah's proposal doesn't need to change to
> > account for whatever we might decide to do in KIP-645.
> > 
> > I have no further concerns about KIP-645, and I think it's a
> > good proposal.
> > 
> > Thanks,
> > -John
> > 
> > P.s., there's still a typo on the wiki that says
> > "ConsumerConfig" on the code block, even though the text now
> > says "StreamsConfig".
> > 
> > 
> > On Fri, 2020-08-21 at 10:56 -0700, Sophie Blee-Goldman
> > wrote:
> > > Just want to make a quick comment on the question that John raised about
> > > whether we
> > > should introduce a separate config for "key" and "value" window sizes:
> > > 
> > > My short answer is No, I don't think that's necessary. First of all, as
> > you
> > > said, there is no
> > > first-class concept of a "Windowed value" in the DSL. Second, to engage
> > in
> > > your rhetorical
> > > question, if there's no default window size for a Streams program then
> > how
> > > can there be a
> > > sensible default for the key AND a separate sensible default for a value?
> > > 
> > > I don't think we need to follow the existing pattern if it doesn't make
> > > sense, and to be honest
> > > I'm a bit skeptical that anyone was even using these default windowed
> > inner
> > > classes since
> > > the config wasn't even defined/documented until quite recently. I'd
> > > actually be in favor
> > > of deprecating StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> > > but I don't want to drag that into this discussion as well.
> > > 
> > > My understanding is that these were meant to mirror the default key/value
> > > serde configs, but
> > > the real use of the DEFAULT_WINDOWED_SERDE_INNER_CLASS config is actually
> > > that you
> > > can at least use it to configure the inner class for a Consumer, thus
> > > making the TimeWindowed
> > > serdes functional at a basic level. With the window size configs, the
> > point
> > > is not really to set a
> > > default but to make it actually work with a Consumer which instantiates
> > the
> > > deserializer by
> > > reflection. So I don't think we should position this new config as a
> > > "default" (although it may
> > > technically behave as one) -- within Streams users can and should always
> > > supply the window
> > > size through the constructor. I don't think that's such an inconvenience,
> > > vs the amount of
> > > confusion that will (and has) been caused by default serde-related
> > configs
> > > in streams.
> > > 
> > > Regarding the fixed vs variable sized config, one idea I had was to just
> > > keep the fixed-size config
> > > and constructor and let users of enumerable windows override the
> > > TimeWindowedSerde class(es)
> > > to do whatever it is they need. IIUC you already have to override some
> > > other windows-related
> > > classes to get variable-sized windows so doing the same for the serdes
> > > sounds reasonable to me.
> > > Just my take on the "simple things should be easy, difficult things
> > should
> > > be possible" mantra
> > > 
> > > One last quick side note: the reason we don't really need to discuss
> > > SessionWindows here
> > > is that they already encode both the start and end time for the window.
> > > This is probably the best
> > > way to go for TimeWindows as well, but making this change in a backwards
> > > compatible way is a
> > > much larger scope of work. And even then, we might want to consider
> > making
> > > it possible to still
> > > just encode the start time to save space, thus requiring this config
> > either
> > > way
> > > 
> > > On Fri, Aug 21, 2020 at 9:26 AM Leah Thomas <lt...@confluent.io>
> > wrote:
> > > > Thanks John and Walker for your thoughts.
> > > > 
> > > > I agree with your two scenarios John, that you configure fully in the
> > > > constructor, or you don't need to call `init()`. IIUC, if we pass the
> > > > deserializer to the consumer, we want to make sure it has the window
> > size
> > > > is set using the newly required constructor. If we don't pass in the
> > > > deserializer, the window size will be set through the configs. To
> > answer
> > > > Walker's question directly, because the configs aren't passed to the
> > > > constructor, we can't set the window size unless we pass it to the
> > > > constructor or configure the constructor after initializing it.
> > > > 
> > > > For users who would rather not set a strict window size (outside of the
> > > > variable size scenario), they can pass in Long.MAX_VALUE. The way I see
> > > > this is instead of having the default be for scenarios that don't
> > require a
> > > > window size, we have the default be the scenarios that *do*, flipping
> > the
> > > > current implementation to fit with typical use cases.
> > > > 
> > > > On your points John:
> > > > 1. I agree that it makes sense to store it in StreamsConfig, this
> > shouldn't
> > > > cause any issues. I've updated the KIP accordingly.
> > > > 
> > > > 2. The non-fixed time windows issue is a good point. It seems like
> > calendar
> > > > windows in particular are quite useful, so I think we want to make sure
> > > > that this wouldn't inhibit flexible sized windows. I think having two
> > > > different configs and functions makes sense, although it is slightly
> > > > messier. While requiring all time windows to use the WindowFunction
> > > > constructor would work, I think that allowing users to access the
> > > > WindowSize constructor is preferable because it seems easier to use for
> > > > people who are not at all interested in delving into variably sized
> > > > windows. This assumption could be wrong though, and perhaps users would
> > > > adapt quickly to the new WindowFunction style, but my immediate
> > reaction is
> > > > to support both configs and constructors.
> > > > 
> > > > One note on this is that Session Windows are handled separately from
> > time
> > > > windows and also have variable window sizes. I assume that the
> > TimeWindowed
> > > > option is preferable for variably sized windows because you still want
> > to
> > > > access the window end times? But I think one alternative could be
> > > > separating the variably sized windows from the current implementation
> > of
> > > > time windows, although I think KIP-645
> > > > <
> > > > 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface
> > > > would make this not strictly necessary.
> > > > 
> > > > Cheers,
> > > > Leah
> > > > 
> > > > On Fri, Aug 21, 2020 at 10:04 AM John Roesler <vv...@apache.org>
> > wrote:
> > > > > Hi Leah,
> > > > > 
> > > > > Thanks for the KIP! This has been a real pain for some use
> > > > > cases, so it's really good to see a proposal to fix it.
> > > > > 
> > > > > We do need a default constructor so that it can be
> > > > > dynamically instantiated by the consumer (or any other
> > > > > component). But I'm +1 on deprecating the constructor you're
> > > > > proposing to deprecate, which only partially configures the
> > > > > class. It seems like there are exactly two patterns: either
> > > > > you fully configure the class in the constructor and don't
> > > > > call `init()`, or you call the default constructor and then
> > > > > configure the class by calling `init()`.
> > > > > 
> > > > > I can appreciate Walker's point, but stepping back, it
> > > > > doesn't actually seem that useful to partially configure the
> > > > > class in the constructor and then finish up the
> > > > > configuration by calling `init()`. I could see the argument
> > > > > if there were a sensible default, but for this particular
> > > > > class, there isn't one. Rhetorical question: what is the
> > > > > default window size for Streams programs?
> > > > > 
> > > > > I have a couple of concerns to discuss:
> > > > > 
> > > > > 1. Config Location
> > > > > 
> > > > > I don't think I would add the new configs to ConsumerConfig,
> > > > > but would add it to StreamsConfig instead. The deserailzier
> > > > > itself is in Streams (it is
> > > > > o.a.k.streams.kstream.TimeWindowedDeserializer), so it seems
> > > > > odd to have one of its configurations in a completely
> > > > > different module.
> > > > > 
> > > > > Also, this class already has two configs, which are in
> > > > > StreamsConfig:
> > > > > StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> > > > > StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> > > > > 
> > > > > It seems like the new config belongs right next to the
> > > > > existing ones.
> > > > > 
> > > > > For me, it raises a secondary question:
> > > > > 1b: Should there be a KEY_WINDOW_SIZE and a
> > > > > VALUE_WINDOW_SIZE? I'm honestly not sure what a "windowed
> > > > > value" even is, but the fact that we can configure serdes
> > > > > for it implies that perhaps we should symmetrically
> > > > > configure its size as well.
> > > > > 
> > > > > 2. Fixed Size Assumption
> > > > > 
> > > > > In KIP-645, I'm proposing to lift the assumption that
> > > > > TimeWindows have a fixed size at all, but KIP-659 is
> > > > > currently built on that assumption.
> > > > > 
> > > > > For details on why this is not a good assumtion, see:
> > > > > https://issues.apache.org/jira/browse/KAFKA-10408
> > > > > 
> > > > > In fact, in my POC PR for KIP-659, I'm dropping the
> > > > > constructor that takes a "window size" parameter in favor of
> > > > > one that takes a window function, mapping a window start
> > > > > time to a full Window(start, end).
> > > > > 
> > > > > In that context, it seems incongruous to introduce a
> > > > > configuration that specifies a window size. Of course, my
> > > > > KIP is also under discussion, so my proposal may not
> > > > > eventually be accepted. But it is necessary to consider both
> > > > > of these concerns together.
> > > > > 
> > > > > One option seems to be to accept both. Namely, we keep the
> > > > > "fixed size" constructor AND add my new constructor (for
> > > > > variably sized windows). Likewise, we accept your proposal,
> > > > > and KIP-659 would propose to add a new config specifying a
> > > > > windowing function, such as:
> > > > > 
> > > > > > StreamsConfig.WINDOW_FUNCTION_CONFIG
> > > > > 
> > > > > which would be an instance of:
> > > > > 
> > > > > > public interface WindowFunction implements Function<Long,
> > > > > Window>;
> > > > > 
> > > > > I'm not bringing these up for discussion in your KIP right
> > > > > now, just demonstrating the feasibility of merging both
> > > > > proposals.
> > > > > 
> > > > > My question for you: do you think the general strategy of
> > > > > having two constructors and two configs, one for fixed and
> > > > > one for variable windows, makes sense? Is it too
> > > > > complicated? Do you have a better idea?
> > > > > 
> > > > > Thanks!
> > > > > -John
> > > > > 
> > > > > On Thu, 2020-08-20 at 14:49 -0700, Walker Carlson wrote:
> > > > > > Hi Leah,
> > > > > > 
> > > > > > Could you explain a bit more why we do not wish to
> > > > > > let TimeWindowedDeserializer and WindowedSerdes be created without
> > a
> > > > > > specified time as a parameter?
> > > > > > 
> > > > > > I understand the long.MAX_VALUE could cause problems but would it
> > not
> > > > be
> > > > > a
> > > > > > good idea to have a usable default or fetch from the config if
> > > > available?
> > > > > > After all you are proposing to add "window.size.ms"
> > > > > > 
> > > > > > We definitely need a fix to this problem and adding "
> > window.size.ms"
> > > > > makes
> > > > > > sense to me.
> > > > > > 
> > > > > > Thanks for the KIP,
> > > > > > Walker
> > > > > > 
> > > > > > On Thu, Aug 20, 2020 at 2:22 PM Leah Thomas <lt...@confluent.io>
> > > > > wrote:
> > > > > > > Hi all,
> > > > > > > 
> > > > > > > I'd like to start a discussion for KIP-659:
> > > > > > > 
> > > > > > > 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
> > > > > > > The goal of the KIP is to ensure that window size is passed to
> > the
> > > > > consumer
> > > > > > > when needed, which will generally be for testing purposes, and to
> > > > avoid
> > > > > > > runtime errors when the *TimeWindowedSerde* is created without a
> > > > window
> > > > > > > size.
> > > > > > > 
> > > > > > > Looking forward to hearing your feedback.
> > > > > > > 
> > > > > > > Cheers,
> > > > > > > Leah
> > > > > > > 


Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

Posted by Leah Thomas <lt...@confluent.io>.
Thanks for the typo catch, John.

Let me know if anyone else has thoughts or ideas.

Cheers,
Leah

On Fri, Aug 21, 2020 at 2:50 PM John Roesler <vv...@apache.org> wrote:

> Thanks, all,
>
> Based on my reading of the conversation, it sounds like I
> have some legwork to do in KIP-645, but our collective
> instinct is that Leah's proposal doesn't need to change to
> account for whatever we might decide to do in KIP-645.
>
> I have no further concerns about KIP-645, and I think it's a
> good proposal.
>
> Thanks,
> -John
>
> P.s., there's still a typo on the wiki that says
> "ConsumerConfig" on the code block, even though the text now
> says "StreamsConfig".
>
>
> On Fri, 2020-08-21 at 10:56 -0700, Sophie Blee-Goldman
> wrote:
> > Just want to make a quick comment on the question that John raised about
> > whether we
> > should introduce a separate config for "key" and "value" window sizes:
> >
> > My short answer is No, I don't think that's necessary. First of all, as
> you
> > said, there is no
> > first-class concept of a "Windowed value" in the DSL. Second, to engage
> in
> > your rhetorical
> > question, if there's no default window size for a Streams program then
> how
> > can there be a
> > sensible default for the key AND a separate sensible default for a value?
> >
> > I don't think we need to follow the existing pattern if it doesn't make
> > sense, and to be honest
> > I'm a bit skeptical that anyone was even using these default windowed
> inner
> > classes since
> > the config wasn't even defined/documented until quite recently. I'd
> > actually be in favor
> > of deprecating StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> > but I don't want to drag that into this discussion as well.
> >
> > My understanding is that these were meant to mirror the default key/value
> > serde configs, but
> > the real use of the DEFAULT_WINDOWED_SERDE_INNER_CLASS config is actually
> > that you
> > can at least use it to configure the inner class for a Consumer, thus
> > making the TimeWindowed
> > serdes functional at a basic level. With the window size configs, the
> point
> > is not really to set a
> > default but to make it actually work with a Consumer which instantiates
> the
> > deserializer by
> > reflection. So I don't think we should position this new config as a
> > "default" (although it may
> > technically behave as one) -- within Streams users can and should always
> > supply the window
> > size through the constructor. I don't think that's such an inconvenience,
> > vs the amount of
> > confusion that will (and has) been caused by default serde-related
> configs
> > in streams.
> >
> > Regarding the fixed vs variable sized config, one idea I had was to just
> > keep the fixed-size config
> > and constructor and let users of enumerable windows override the
> > TimeWindowedSerde class(es)
> > to do whatever it is they need. IIUC you already have to override some
> > other windows-related
> > classes to get variable-sized windows so doing the same for the serdes
> > sounds reasonable to me.
> > Just my take on the "simple things should be easy, difficult things
> should
> > be possible" mantra
> >
> > One last quick side note: the reason we don't really need to discuss
> > SessionWindows here
> > is that they already encode both the start and end time for the window.
> > This is probably the best
> > way to go for TimeWindows as well, but making this change in a backwards
> > compatible way is a
> > much larger scope of work. And even then, we might want to consider
> making
> > it possible to still
> > just encode the start time to save space, thus requiring this config
> either
> > way
> >
> > On Fri, Aug 21, 2020 at 9:26 AM Leah Thomas <lt...@confluent.io>
> wrote:
> >
> > > Thanks John and Walker for your thoughts.
> > >
> > > I agree with your two scenarios John, that you configure fully in the
> > > constructor, or you don't need to call `init()`. IIUC, if we pass the
> > > deserializer to the consumer, we want to make sure it has the window
> size
> > > is set using the newly required constructor. If we don't pass in the
> > > deserializer, the window size will be set through the configs. To
> answer
> > > Walker's question directly, because the configs aren't passed to the
> > > constructor, we can't set the window size unless we pass it to the
> > > constructor or configure the constructor after initializing it.
> > >
> > > For users who would rather not set a strict window size (outside of the
> > > variable size scenario), they can pass in Long.MAX_VALUE. The way I see
> > > this is instead of having the default be for scenarios that don't
> require a
> > > window size, we have the default be the scenarios that *do*, flipping
> the
> > > current implementation to fit with typical use cases.
> > >
> > > On your points John:
> > > 1. I agree that it makes sense to store it in StreamsConfig, this
> shouldn't
> > > cause any issues. I've updated the KIP accordingly.
> > >
> > > 2. The non-fixed time windows issue is a good point. It seems like
> calendar
> > > windows in particular are quite useful, so I think we want to make sure
> > > that this wouldn't inhibit flexible sized windows. I think having two
> > > different configs and functions makes sense, although it is slightly
> > > messier. While requiring all time windows to use the WindowFunction
> > > constructor would work, I think that allowing users to access the
> > > WindowSize constructor is preferable because it seems easier to use for
> > > people who are not at all interested in delving into variably sized
> > > windows. This assumption could be wrong though, and perhaps users would
> > > adapt quickly to the new WindowFunction style, but my immediate
> reaction is
> > > to support both configs and constructors.
> > >
> > > One note on this is that Session Windows are handled separately from
> time
> > > windows and also have variable window sizes. I assume that the
> TimeWindowed
> > > option is preferable for variably sized windows because you still want
> to
> > > access the window end times? But I think one alternative could be
> > > separating the variably sized windows from the current implementation
> of
> > > time windows, although I think KIP-645
> > > <
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface
> > > would make this not strictly necessary.
> > >
> > > Cheers,
> > > Leah
> > >
> > > On Fri, Aug 21, 2020 at 10:04 AM John Roesler <vv...@apache.org>
> wrote:
> > >
> > > > Hi Leah,
> > > >
> > > > Thanks for the KIP! This has been a real pain for some use
> > > > cases, so it's really good to see a proposal to fix it.
> > > >
> > > > We do need a default constructor so that it can be
> > > > dynamically instantiated by the consumer (or any other
> > > > component). But I'm +1 on deprecating the constructor you're
> > > > proposing to deprecate, which only partially configures the
> > > > class. It seems like there are exactly two patterns: either
> > > > you fully configure the class in the constructor and don't
> > > > call `init()`, or you call the default constructor and then
> > > > configure the class by calling `init()`.
> > > >
> > > > I can appreciate Walker's point, but stepping back, it
> > > > doesn't actually seem that useful to partially configure the
> > > > class in the constructor and then finish up the
> > > > configuration by calling `init()`. I could see the argument
> > > > if there were a sensible default, but for this particular
> > > > class, there isn't one. Rhetorical question: what is the
> > > > default window size for Streams programs?
> > > >
> > > > I have a couple of concerns to discuss:
> > > >
> > > > 1. Config Location
> > > >
> > > > I don't think I would add the new configs to ConsumerConfig,
> > > > but would add it to StreamsConfig instead. The deserailzier
> > > > itself is in Streams (it is
> > > > o.a.k.streams.kstream.TimeWindowedDeserializer), so it seems
> > > > odd to have one of its configurations in a completely
> > > > different module.
> > > >
> > > > Also, this class already has two configs, which are in
> > > > StreamsConfig:
> > > > StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> > > > StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> > > >
> > > > It seems like the new config belongs right next to the
> > > > existing ones.
> > > >
> > > > For me, it raises a secondary question:
> > > > 1b: Should there be a KEY_WINDOW_SIZE and a
> > > > VALUE_WINDOW_SIZE? I'm honestly not sure what a "windowed
> > > > value" even is, but the fact that we can configure serdes
> > > > for it implies that perhaps we should symmetrically
> > > > configure its size as well.
> > > >
> > > > 2. Fixed Size Assumption
> > > >
> > > > In KIP-645, I'm proposing to lift the assumption that
> > > > TimeWindows have a fixed size at all, but KIP-659 is
> > > > currently built on that assumption.
> > > >
> > > > For details on why this is not a good assumtion, see:
> > > > https://issues.apache.org/jira/browse/KAFKA-10408
> > > >
> > > > In fact, in my POC PR for KIP-659, I'm dropping the
> > > > constructor that takes a "window size" parameter in favor of
> > > > one that takes a window function, mapping a window start
> > > > time to a full Window(start, end).
> > > >
> > > > In that context, it seems incongruous to introduce a
> > > > configuration that specifies a window size. Of course, my
> > > > KIP is also under discussion, so my proposal may not
> > > > eventually be accepted. But it is necessary to consider both
> > > > of these concerns together.
> > > >
> > > > One option seems to be to accept both. Namely, we keep the
> > > > "fixed size" constructor AND add my new constructor (for
> > > > variably sized windows). Likewise, we accept your proposal,
> > > > and KIP-659 would propose to add a new config specifying a
> > > > windowing function, such as:
> > > >
> > > > > StreamsConfig.WINDOW_FUNCTION_CONFIG
> > > >
> > > > which would be an instance of:
> > > >
> > > > > public interface WindowFunction implements Function<Long,
> > > > Window>;
> > > >
> > > > I'm not bringing these up for discussion in your KIP right
> > > > now, just demonstrating the feasibility of merging both
> > > > proposals.
> > > >
> > > > My question for you: do you think the general strategy of
> > > > having two constructors and two configs, one for fixed and
> > > > one for variable windows, makes sense? Is it too
> > > > complicated? Do you have a better idea?
> > > >
> > > > Thanks!
> > > > -John
> > > >
> > > > On Thu, 2020-08-20 at 14:49 -0700, Walker Carlson wrote:
> > > > > Hi Leah,
> > > > >
> > > > > Could you explain a bit more why we do not wish to
> > > > > let TimeWindowedDeserializer and WindowedSerdes be created without
> a
> > > > > specified time as a parameter?
> > > > >
> > > > > I understand the long.MAX_VALUE could cause problems but would it
> not
> > > be
> > > > a
> > > > > good idea to have a usable default or fetch from the config if
> > > available?
> > > > > After all you are proposing to add "window.size.ms"
> > > > >
> > > > > We definitely need a fix to this problem and adding "
> window.size.ms"
> > > > makes
> > > > > sense to me.
> > > > >
> > > > > Thanks for the KIP,
> > > > > Walker
> > > > >
> > > > > On Thu, Aug 20, 2020 at 2:22 PM Leah Thomas <lt...@confluent.io>
> > > > wrote:
> > > > > > Hi all,
> > > > > >
> > > > > > I'd like to start a discussion for KIP-659:
> > > > > >
> > > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
> > > > > >
> > > > > > The goal of the KIP is to ensure that window size is passed to
> the
> > > > consumer
> > > > > > when needed, which will generally be for testing purposes, and to
> > > avoid
> > > > > > runtime errors when the *TimeWindowedSerde* is created without a
> > > window
> > > > > > size.
> > > > > >
> > > > > > Looking forward to hearing your feedback.
> > > > > >
> > > > > > Cheers,
> > > > > > Leah
> > > > > >
>
>

Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

Posted by John Roesler <vv...@apache.org>.
Thanks, all,

Based on my reading of the conversation, it sounds like I
have some legwork to do in KIP-645, but our collective
instinct is that Leah's proposal doesn't need to change to
account for whatever we might decide to do in KIP-645.

I have no further concerns about KIP-645, and I think it's a
good proposal.

Thanks,
-John

P.s., there's still a typo on the wiki that says
"ConsumerConfig" on the code block, even though the text now
says "StreamsConfig".


On Fri, 2020-08-21 at 10:56 -0700, Sophie Blee-Goldman
wrote:
> Just want to make a quick comment on the question that John raised about
> whether we
> should introduce a separate config for "key" and "value" window sizes:
> 
> My short answer is No, I don't think that's necessary. First of all, as you
> said, there is no
> first-class concept of a "Windowed value" in the DSL. Second, to engage in
> your rhetorical
> question, if there's no default window size for a Streams program then how
> can there be a
> sensible default for the key AND a separate sensible default for a value?
> 
> I don't think we need to follow the existing pattern if it doesn't make
> sense, and to be honest
> I'm a bit skeptical that anyone was even using these default windowed inner
> classes since
> the config wasn't even defined/documented until quite recently. I'd
> actually be in favor
> of deprecating StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> but I don't want to drag that into this discussion as well.
> 
> My understanding is that these were meant to mirror the default key/value
> serde configs, but
> the real use of the DEFAULT_WINDOWED_SERDE_INNER_CLASS config is actually
> that you
> can at least use it to configure the inner class for a Consumer, thus
> making the TimeWindowed
> serdes functional at a basic level. With the window size configs, the point
> is not really to set a
> default but to make it actually work with a Consumer which instantiates the
> deserializer by
> reflection. So I don't think we should position this new config as a
> "default" (although it may
> technically behave as one) -- within Streams users can and should always
> supply the window
> size through the constructor. I don't think that's such an inconvenience,
> vs the amount of
> confusion that will (and has) been caused by default serde-related configs
> in streams.
> 
> Regarding the fixed vs variable sized config, one idea I had was to just
> keep the fixed-size config
> and constructor and let users of enumerable windows override the
> TimeWindowedSerde class(es)
> to do whatever it is they need. IIUC you already have to override some
> other windows-related
> classes to get variable-sized windows so doing the same for the serdes
> sounds reasonable to me.
> Just my take on the "simple things should be easy, difficult things should
> be possible" mantra
> 
> One last quick side note: the reason we don't really need to discuss
> SessionWindows here
> is that they already encode both the start and end time for the window.
> This is probably the best
> way to go for TimeWindows as well, but making this change in a backwards
> compatible way is a
> much larger scope of work. And even then, we might want to consider making
> it possible to still
> just encode the start time to save space, thus requiring this config either
> way
> 
> On Fri, Aug 21, 2020 at 9:26 AM Leah Thomas <lt...@confluent.io> wrote:
> 
> > Thanks John and Walker for your thoughts.
> > 
> > I agree with your two scenarios John, that you configure fully in the
> > constructor, or you don't need to call `init()`. IIUC, if we pass the
> > deserializer to the consumer, we want to make sure it has the window size
> > is set using the newly required constructor. If we don't pass in the
> > deserializer, the window size will be set through the configs. To answer
> > Walker's question directly, because the configs aren't passed to the
> > constructor, we can't set the window size unless we pass it to the
> > constructor or configure the constructor after initializing it.
> > 
> > For users who would rather not set a strict window size (outside of the
> > variable size scenario), they can pass in Long.MAX_VALUE. The way I see
> > this is instead of having the default be for scenarios that don't require a
> > window size, we have the default be the scenarios that *do*, flipping the
> > current implementation to fit with typical use cases.
> > 
> > On your points John:
> > 1. I agree that it makes sense to store it in StreamsConfig, this shouldn't
> > cause any issues. I've updated the KIP accordingly.
> > 
> > 2. The non-fixed time windows issue is a good point. It seems like calendar
> > windows in particular are quite useful, so I think we want to make sure
> > that this wouldn't inhibit flexible sized windows. I think having two
> > different configs and functions makes sense, although it is slightly
> > messier. While requiring all time windows to use the WindowFunction
> > constructor would work, I think that allowing users to access the
> > WindowSize constructor is preferable because it seems easier to use for
> > people who are not at all interested in delving into variably sized
> > windows. This assumption could be wrong though, and perhaps users would
> > adapt quickly to the new WindowFunction style, but my immediate reaction is
> > to support both configs and constructors.
> > 
> > One note on this is that Session Windows are handled separately from time
> > windows and also have variable window sizes. I assume that the TimeWindowed
> > option is preferable for variably sized windows because you still want to
> > access the window end times? But I think one alternative could be
> > separating the variably sized windows from the current implementation of
> > time windows, although I think KIP-645
> > <
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface
> > would make this not strictly necessary.
> > 
> > Cheers,
> > Leah
> > 
> > On Fri, Aug 21, 2020 at 10:04 AM John Roesler <vv...@apache.org> wrote:
> > 
> > > Hi Leah,
> > > 
> > > Thanks for the KIP! This has been a real pain for some use
> > > cases, so it's really good to see a proposal to fix it.
> > > 
> > > We do need a default constructor so that it can be
> > > dynamically instantiated by the consumer (or any other
> > > component). But I'm +1 on deprecating the constructor you're
> > > proposing to deprecate, which only partially configures the
> > > class. It seems like there are exactly two patterns: either
> > > you fully configure the class in the constructor and don't
> > > call `init()`, or you call the default constructor and then
> > > configure the class by calling `init()`.
> > > 
> > > I can appreciate Walker's point, but stepping back, it
> > > doesn't actually seem that useful to partially configure the
> > > class in the constructor and then finish up the
> > > configuration by calling `init()`. I could see the argument
> > > if there were a sensible default, but for this particular
> > > class, there isn't one. Rhetorical question: what is the
> > > default window size for Streams programs?
> > > 
> > > I have a couple of concerns to discuss:
> > > 
> > > 1. Config Location
> > > 
> > > I don't think I would add the new configs to ConsumerConfig,
> > > but would add it to StreamsConfig instead. The deserailzier
> > > itself is in Streams (it is
> > > o.a.k.streams.kstream.TimeWindowedDeserializer), so it seems
> > > odd to have one of its configurations in a completely
> > > different module.
> > > 
> > > Also, this class already has two configs, which are in
> > > StreamsConfig:
> > > StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> > > StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> > > 
> > > It seems like the new config belongs right next to the
> > > existing ones.
> > > 
> > > For me, it raises a secondary question:
> > > 1b: Should there be a KEY_WINDOW_SIZE and a
> > > VALUE_WINDOW_SIZE? I'm honestly not sure what a "windowed
> > > value" even is, but the fact that we can configure serdes
> > > for it implies that perhaps we should symmetrically
> > > configure its size as well.
> > > 
> > > 2. Fixed Size Assumption
> > > 
> > > In KIP-645, I'm proposing to lift the assumption that
> > > TimeWindows have a fixed size at all, but KIP-659 is
> > > currently built on that assumption.
> > > 
> > > For details on why this is not a good assumtion, see:
> > > https://issues.apache.org/jira/browse/KAFKA-10408
> > > 
> > > In fact, in my POC PR for KIP-659, I'm dropping the
> > > constructor that takes a "window size" parameter in favor of
> > > one that takes a window function, mapping a window start
> > > time to a full Window(start, end).
> > > 
> > > In that context, it seems incongruous to introduce a
> > > configuration that specifies a window size. Of course, my
> > > KIP is also under discussion, so my proposal may not
> > > eventually be accepted. But it is necessary to consider both
> > > of these concerns together.
> > > 
> > > One option seems to be to accept both. Namely, we keep the
> > > "fixed size" constructor AND add my new constructor (for
> > > variably sized windows). Likewise, we accept your proposal,
> > > and KIP-659 would propose to add a new config specifying a
> > > windowing function, such as:
> > > 
> > > > StreamsConfig.WINDOW_FUNCTION_CONFIG
> > > 
> > > which would be an instance of:
> > > 
> > > > public interface WindowFunction implements Function<Long,
> > > Window>;
> > > 
> > > I'm not bringing these up for discussion in your KIP right
> > > now, just demonstrating the feasibility of merging both
> > > proposals.
> > > 
> > > My question for you: do you think the general strategy of
> > > having two constructors and two configs, one for fixed and
> > > one for variable windows, makes sense? Is it too
> > > complicated? Do you have a better idea?
> > > 
> > > Thanks!
> > > -John
> > > 
> > > On Thu, 2020-08-20 at 14:49 -0700, Walker Carlson wrote:
> > > > Hi Leah,
> > > > 
> > > > Could you explain a bit more why we do not wish to
> > > > let TimeWindowedDeserializer and WindowedSerdes be created without a
> > > > specified time as a parameter?
> > > > 
> > > > I understand the long.MAX_VALUE could cause problems but would it not
> > be
> > > a
> > > > good idea to have a usable default or fetch from the config if
> > available?
> > > > After all you are proposing to add "window.size.ms"
> > > > 
> > > > We definitely need a fix to this problem and adding "window.size.ms"
> > > makes
> > > > sense to me.
> > > > 
> > > > Thanks for the KIP,
> > > > Walker
> > > > 
> > > > On Thu, Aug 20, 2020 at 2:22 PM Leah Thomas <lt...@confluent.io>
> > > wrote:
> > > > > Hi all,
> > > > > 
> > > > > I'd like to start a discussion for KIP-659:
> > > > > 
> > > > > 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
> > > > > 
> > > > > The goal of the KIP is to ensure that window size is passed to the
> > > consumer
> > > > > when needed, which will generally be for testing purposes, and to
> > avoid
> > > > > runtime errors when the *TimeWindowedSerde* is created without a
> > window
> > > > > size.
> > > > > 
> > > > > Looking forward to hearing your feedback.
> > > > > 
> > > > > Cheers,
> > > > > Leah
> > > > > 


Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

Posted by Sophie Blee-Goldman <so...@confluent.io>.
Just want to make a quick comment on the question that John raised about
whether we
should introduce a separate config for "key" and "value" window sizes:

My short answer is No, I don't think that's necessary. First of all, as you
said, there is no
first-class concept of a "Windowed value" in the DSL. Second, to engage in
your rhetorical
question, if there's no default window size for a Streams program then how
can there be a
sensible default for the key AND a separate sensible default for a value?

I don't think we need to follow the existing pattern if it doesn't make
sense, and to be honest
I'm a bit skeptical that anyone was even using these default windowed inner
classes since
the config wasn't even defined/documented until quite recently. I'd
actually be in favor
of deprecating StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
but I don't want to drag that into this discussion as well.

My understanding is that these were meant to mirror the default key/value
serde configs, but
the real use of the DEFAULT_WINDOWED_SERDE_INNER_CLASS config is actually
that you
can at least use it to configure the inner class for a Consumer, thus
making the TimeWindowed
serdes functional at a basic level. With the window size configs, the point
is not really to set a
default but to make it actually work with a Consumer which instantiates the
deserializer by
reflection. So I don't think we should position this new config as a
"default" (although it may
technically behave as one) -- within Streams users can and should always
supply the window
size through the constructor. I don't think that's such an inconvenience,
vs the amount of
confusion that will (and has) been caused by default serde-related configs
in streams.

Regarding the fixed vs variable sized config, one idea I had was to just
keep the fixed-size config
and constructor and let users of enumerable windows override the
TimeWindowedSerde class(es)
to do whatever it is they need. IIUC you already have to override some
other windows-related
classes to get variable-sized windows so doing the same for the serdes
sounds reasonable to me.
Just my take on the "simple things should be easy, difficult things should
be possible" mantra

One last quick side note: the reason we don't really need to discuss
SessionWindows here
is that they already encode both the start and end time for the window.
This is probably the best
way to go for TimeWindows as well, but making this change in a backwards
compatible way is a
much larger scope of work. And even then, we might want to consider making
it possible to still
just encode the start time to save space, thus requiring this config either
way

On Fri, Aug 21, 2020 at 9:26 AM Leah Thomas <lt...@confluent.io> wrote:

> Thanks John and Walker for your thoughts.
>
> I agree with your two scenarios John, that you configure fully in the
> constructor, or you don't need to call `init()`. IIUC, if we pass the
> deserializer to the consumer, we want to make sure it has the window size
> is set using the newly required constructor. If we don't pass in the
> deserializer, the window size will be set through the configs. To answer
> Walker's question directly, because the configs aren't passed to the
> constructor, we can't set the window size unless we pass it to the
> constructor or configure the constructor after initializing it.
>
> For users who would rather not set a strict window size (outside of the
> variable size scenario), they can pass in Long.MAX_VALUE. The way I see
> this is instead of having the default be for scenarios that don't require a
> window size, we have the default be the scenarios that *do*, flipping the
> current implementation to fit with typical use cases.
>
> On your points John:
> 1. I agree that it makes sense to store it in StreamsConfig, this shouldn't
> cause any issues. I've updated the KIP accordingly.
>
> 2. The non-fixed time windows issue is a good point. It seems like calendar
> windows in particular are quite useful, so I think we want to make sure
> that this wouldn't inhibit flexible sized windows. I think having two
> different configs and functions makes sense, although it is slightly
> messier. While requiring all time windows to use the WindowFunction
> constructor would work, I think that allowing users to access the
> WindowSize constructor is preferable because it seems easier to use for
> people who are not at all interested in delving into variably sized
> windows. This assumption could be wrong though, and perhaps users would
> adapt quickly to the new WindowFunction style, but my immediate reaction is
> to support both configs and constructors.
>
> One note on this is that Session Windows are handled separately from time
> windows and also have variable window sizes. I assume that the TimeWindowed
> option is preferable for variably sized windows because you still want to
> access the window end times? But I think one alternative could be
> separating the variably sized windows from the current implementation of
> time windows, although I think KIP-645
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface
> >
> would make this not strictly necessary.
>
> Cheers,
> Leah
>
> On Fri, Aug 21, 2020 at 10:04 AM John Roesler <vv...@apache.org> wrote:
>
> > Hi Leah,
> >
> > Thanks for the KIP! This has been a real pain for some use
> > cases, so it's really good to see a proposal to fix it.
> >
> > We do need a default constructor so that it can be
> > dynamically instantiated by the consumer (or any other
> > component). But I'm +1 on deprecating the constructor you're
> > proposing to deprecate, which only partially configures the
> > class. It seems like there are exactly two patterns: either
> > you fully configure the class in the constructor and don't
> > call `init()`, or you call the default constructor and then
> > configure the class by calling `init()`.
> >
> > I can appreciate Walker's point, but stepping back, it
> > doesn't actually seem that useful to partially configure the
> > class in the constructor and then finish up the
> > configuration by calling `init()`. I could see the argument
> > if there were a sensible default, but for this particular
> > class, there isn't one. Rhetorical question: what is the
> > default window size for Streams programs?
> >
> > I have a couple of concerns to discuss:
> >
> > 1. Config Location
> >
> > I don't think I would add the new configs to ConsumerConfig,
> > but would add it to StreamsConfig instead. The deserailzier
> > itself is in Streams (it is
> > o.a.k.streams.kstream.TimeWindowedDeserializer), so it seems
> > odd to have one of its configurations in a completely
> > different module.
> >
> > Also, this class already has two configs, which are in
> > StreamsConfig:
> > StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> > StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> >
> > It seems like the new config belongs right next to the
> > existing ones.
> >
> > For me, it raises a secondary question:
> > 1b: Should there be a KEY_WINDOW_SIZE and a
> > VALUE_WINDOW_SIZE? I'm honestly not sure what a "windowed
> > value" even is, but the fact that we can configure serdes
> > for it implies that perhaps we should symmetrically
> > configure its size as well.
> >
> > 2. Fixed Size Assumption
> >
> > In KIP-645, I'm proposing to lift the assumption that
> > TimeWindows have a fixed size at all, but KIP-659 is
> > currently built on that assumption.
> >
> > For details on why this is not a good assumtion, see:
> > https://issues.apache.org/jira/browse/KAFKA-10408
> >
> > In fact, in my POC PR for KIP-659, I'm dropping the
> > constructor that takes a "window size" parameter in favor of
> > one that takes a window function, mapping a window start
> > time to a full Window(start, end).
> >
> > In that context, it seems incongruous to introduce a
> > configuration that specifies a window size. Of course, my
> > KIP is also under discussion, so my proposal may not
> > eventually be accepted. But it is necessary to consider both
> > of these concerns together.
> >
> > One option seems to be to accept both. Namely, we keep the
> > "fixed size" constructor AND add my new constructor (for
> > variably sized windows). Likewise, we accept your proposal,
> > and KIP-659 would propose to add a new config specifying a
> > windowing function, such as:
> >
> > > StreamsConfig.WINDOW_FUNCTION_CONFIG
> >
> > which would be an instance of:
> >
> > > public interface WindowFunction implements Function<Long,
> > Window>;
> >
> > I'm not bringing these up for discussion in your KIP right
> > now, just demonstrating the feasibility of merging both
> > proposals.
> >
> > My question for you: do you think the general strategy of
> > having two constructors and two configs, one for fixed and
> > one for variable windows, makes sense? Is it too
> > complicated? Do you have a better idea?
> >
> > Thanks!
> > -John
> >
> > On Thu, 2020-08-20 at 14:49 -0700, Walker Carlson wrote:
> > > Hi Leah,
> > >
> > > Could you explain a bit more why we do not wish to
> > > let TimeWindowedDeserializer and WindowedSerdes be created without a
> > > specified time as a parameter?
> > >
> > > I understand the long.MAX_VALUE could cause problems but would it not
> be
> > a
> > > good idea to have a usable default or fetch from the config if
> available?
> > > After all you are proposing to add "window.size.ms"
> > >
> > > We definitely need a fix to this problem and adding "window.size.ms"
> > makes
> > > sense to me.
> > >
> > > Thanks for the KIP,
> > > Walker
> > >
> > > On Thu, Aug 20, 2020 at 2:22 PM Leah Thomas <lt...@confluent.io>
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'd like to start a discussion for KIP-659:
> > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
> > > >
> > > >
> > > > The goal of the KIP is to ensure that window size is passed to the
> > consumer
> > > > when needed, which will generally be for testing purposes, and to
> avoid
> > > > runtime errors when the *TimeWindowedSerde* is created without a
> window
> > > > size.
> > > >
> > > > Looking forward to hearing your feedback.
> > > >
> > > > Cheers,
> > > > Leah
> > > >
> >
> >
>

Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

Posted by Leah Thomas <lt...@confluent.io>.
Thanks John and Walker for your thoughts.

I agree with your two scenarios John, that you configure fully in the
constructor, or you don't need to call `init()`. IIUC, if we pass the
deserializer to the consumer, we want to make sure it has the window size
is set using the newly required constructor. If we don't pass in the
deserializer, the window size will be set through the configs. To answer
Walker's question directly, because the configs aren't passed to the
constructor, we can't set the window size unless we pass it to the
constructor or configure the constructor after initializing it.

For users who would rather not set a strict window size (outside of the
variable size scenario), they can pass in Long.MAX_VALUE. The way I see
this is instead of having the default be for scenarios that don't require a
window size, we have the default be the scenarios that *do*, flipping the
current implementation to fit with typical use cases.

On your points John:
1. I agree that it makes sense to store it in StreamsConfig, this shouldn't
cause any issues. I've updated the KIP accordingly.

2. The non-fixed time windows issue is a good point. It seems like calendar
windows in particular are quite useful, so I think we want to make sure
that this wouldn't inhibit flexible sized windows. I think having two
different configs and functions makes sense, although it is slightly
messier. While requiring all time windows to use the WindowFunction
constructor would work, I think that allowing users to access the
WindowSize constructor is preferable because it seems easier to use for
people who are not at all interested in delving into variably sized
windows. This assumption could be wrong though, and perhaps users would
adapt quickly to the new WindowFunction style, but my immediate reaction is
to support both configs and constructors.

One note on this is that Session Windows are handled separately from time
windows and also have variable window sizes. I assume that the TimeWindowed
option is preferable for variably sized windows because you still want to
access the window end times? But I think one alternative could be
separating the variably sized windows from the current implementation of
time windows, although I think KIP-645
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface>
would make this not strictly necessary.

Cheers,
Leah

On Fri, Aug 21, 2020 at 10:04 AM John Roesler <vv...@apache.org> wrote:

> Hi Leah,
>
> Thanks for the KIP! This has been a real pain for some use
> cases, so it's really good to see a proposal to fix it.
>
> We do need a default constructor so that it can be
> dynamically instantiated by the consumer (or any other
> component). But I'm +1 on deprecating the constructor you're
> proposing to deprecate, which only partially configures the
> class. It seems like there are exactly two patterns: either
> you fully configure the class in the constructor and don't
> call `init()`, or you call the default constructor and then
> configure the class by calling `init()`.
>
> I can appreciate Walker's point, but stepping back, it
> doesn't actually seem that useful to partially configure the
> class in the constructor and then finish up the
> configuration by calling `init()`. I could see the argument
> if there were a sensible default, but for this particular
> class, there isn't one. Rhetorical question: what is the
> default window size for Streams programs?
>
> I have a couple of concerns to discuss:
>
> 1. Config Location
>
> I don't think I would add the new configs to ConsumerConfig,
> but would add it to StreamsConfig instead. The deserailzier
> itself is in Streams (it is
> o.a.k.streams.kstream.TimeWindowedDeserializer), so it seems
> odd to have one of its configurations in a completely
> different module.
>
> Also, this class already has two configs, which are in
> StreamsConfig:
> StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
>
> It seems like the new config belongs right next to the
> existing ones.
>
> For me, it raises a secondary question:
> 1b: Should there be a KEY_WINDOW_SIZE and a
> VALUE_WINDOW_SIZE? I'm honestly not sure what a "windowed
> value" even is, but the fact that we can configure serdes
> for it implies that perhaps we should symmetrically
> configure its size as well.
>
> 2. Fixed Size Assumption
>
> In KIP-645, I'm proposing to lift the assumption that
> TimeWindows have a fixed size at all, but KIP-659 is
> currently built on that assumption.
>
> For details on why this is not a good assumtion, see:
> https://issues.apache.org/jira/browse/KAFKA-10408
>
> In fact, in my POC PR for KIP-659, I'm dropping the
> constructor that takes a "window size" parameter in favor of
> one that takes a window function, mapping a window start
> time to a full Window(start, end).
>
> In that context, it seems incongruous to introduce a
> configuration that specifies a window size. Of course, my
> KIP is also under discussion, so my proposal may not
> eventually be accepted. But it is necessary to consider both
> of these concerns together.
>
> One option seems to be to accept both. Namely, we keep the
> "fixed size" constructor AND add my new constructor (for
> variably sized windows). Likewise, we accept your proposal,
> and KIP-659 would propose to add a new config specifying a
> windowing function, such as:
>
> > StreamsConfig.WINDOW_FUNCTION_CONFIG
>
> which would be an instance of:
>
> > public interface WindowFunction implements Function<Long,
> Window>;
>
> I'm not bringing these up for discussion in your KIP right
> now, just demonstrating the feasibility of merging both
> proposals.
>
> My question for you: do you think the general strategy of
> having two constructors and two configs, one for fixed and
> one for variable windows, makes sense? Is it too
> complicated? Do you have a better idea?
>
> Thanks!
> -John
>
> On Thu, 2020-08-20 at 14:49 -0700, Walker Carlson wrote:
> > Hi Leah,
> >
> > Could you explain a bit more why we do not wish to
> > let TimeWindowedDeserializer and WindowedSerdes be created without a
> > specified time as a parameter?
> >
> > I understand the long.MAX_VALUE could cause problems but would it not be
> a
> > good idea to have a usable default or fetch from the config if available?
> > After all you are proposing to add "window.size.ms"
> >
> > We definitely need a fix to this problem and adding "window.size.ms"
> makes
> > sense to me.
> >
> > Thanks for the KIP,
> > Walker
> >
> > On Thu, Aug 20, 2020 at 2:22 PM Leah Thomas <lt...@confluent.io>
> wrote:
> >
> > > Hi all,
> > >
> > > I'd like to start a discussion for KIP-659:
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
> > >
> > >
> > > The goal of the KIP is to ensure that window size is passed to the
> consumer
> > > when needed, which will generally be for testing purposes, and to avoid
> > > runtime errors when the *TimeWindowedSerde* is created without a window
> > > size.
> > >
> > > Looking forward to hearing your feedback.
> > >
> > > Cheers,
> > > Leah
> > >
>
>

Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

Posted by John Roesler <vv...@apache.org>.
Hi Leah,

Thanks for the KIP! This has been a real pain for some use
cases, so it's really good to see a proposal to fix it.

We do need a default constructor so that it can be
dynamically instantiated by the consumer (or any other
component). But I'm +1 on deprecating the constructor you're
proposing to deprecate, which only partially configures the
class. It seems like there are exactly two patterns: either
you fully configure the class in the constructor and don't
call `init()`, or you call the default constructor and then
configure the class by calling `init()`.

I can appreciate Walker's point, but stepping back, it
doesn't actually seem that useful to partially configure the
class in the constructor and then finish up the
configuration by calling `init()`. I could see the argument
if there were a sensible default, but for this particular
class, there isn't one. Rhetorical question: what is the
default window size for Streams programs?

I have a couple of concerns to discuss:

1. Config Location

I don't think I would add the new configs to ConsumerConfig,
but would add it to StreamsConfig instead. The deserailzier
itself is in Streams (it is
o.a.k.streams.kstream.TimeWindowedDeserializer), so it seems
odd to have one of its configurations in a completely
different module.

Also, this class already has two configs, which are in
StreamsConfig:
StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS

It seems like the new config belongs right next to the
existing ones.

For me, it raises a secondary question:
1b: Should there be a KEY_WINDOW_SIZE and a
VALUE_WINDOW_SIZE? I'm honestly not sure what a "windowed
value" even is, but the fact that we can configure serdes
for it implies that perhaps we should symmetrically
configure its size as well.

2. Fixed Size Assumption

In KIP-645, I'm proposing to lift the assumption that
TimeWindows have a fixed size at all, but KIP-659 is
currently built on that assumption.

For details on why this is not a good assumtion, see:
https://issues.apache.org/jira/browse/KAFKA-10408

In fact, in my POC PR for KIP-659, I'm dropping the
constructor that takes a "window size" parameter in favor of
one that takes a window function, mapping a window start
time to a full Window(start, end).

In that context, it seems incongruous to introduce a
configuration that specifies a window size. Of course, my
KIP is also under discussion, so my proposal may not
eventually be accepted. But it is necessary to consider both
of these concerns together.

One option seems to be to accept both. Namely, we keep the
"fixed size" constructor AND add my new constructor (for
variably sized windows). Likewise, we accept your proposal,
and KIP-659 would propose to add a new config specifying a
windowing function, such as:

> StreamsConfig.WINDOW_FUNCTION_CONFIG

which would be an instance of:

> public interface WindowFunction implements Function<Long,
Window>;

I'm not bringing these up for discussion in your KIP right
now, just demonstrating the feasibility of merging both
proposals.

My question for you: do you think the general strategy of
having two constructors and two configs, one for fixed and
one for variable windows, makes sense? Is it too
complicated? Do you have a better idea?

Thanks!
-John

On Thu, 2020-08-20 at 14:49 -0700, Walker Carlson wrote:
> Hi Leah,
> 
> Could you explain a bit more why we do not wish to
> let TimeWindowedDeserializer and WindowedSerdes be created without a
> specified time as a parameter?
> 
> I understand the long.MAX_VALUE could cause problems but would it not be a
> good idea to have a usable default or fetch from the config if available?
> After all you are proposing to add "window.size.ms"
> 
> We definitely need a fix to this problem and adding "window.size.ms" makes
> sense to me.
> 
> Thanks for the KIP,
> Walker
> 
> On Thu, Aug 20, 2020 at 2:22 PM Leah Thomas <lt...@confluent.io> wrote:
> 
> > Hi all,
> > 
> > I'd like to start a discussion for KIP-659:
> > 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
> > 
> > 
> > The goal of the KIP is to ensure that window size is passed to the consumer
> > when needed, which will generally be for testing purposes, and to avoid
> > runtime errors when the *TimeWindowedSerde* is created without a window
> > size.
> > 
> > Looking forward to hearing your feedback.
> > 
> > Cheers,
> > Leah
> > 


Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

Posted by Walker Carlson <wc...@confluent.io>.
Hi Leah,

Could you explain a bit more why we do not wish to
let TimeWindowedDeserializer and WindowedSerdes be created without a
specified time as a parameter?

I understand the long.MAX_VALUE could cause problems but would it not be a
good idea to have a usable default or fetch from the config if available?
After all you are proposing to add "window.size.ms"

We definitely need a fix to this problem and adding "window.size.ms" makes
sense to me.

Thanks for the KIP,
Walker

On Thu, Aug 20, 2020 at 2:22 PM Leah Thomas <lt...@confluent.io> wrote:

> Hi all,
>
> I'd like to start a discussion for KIP-659:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
>
>
> The goal of the KIP is to ensure that window size is passed to the consumer
> when needed, which will generally be for testing purposes, and to avoid
> runtime errors when the *TimeWindowedSerde* is created without a window
> size.
>
> Looking forward to hearing your feedback.
>
> Cheers,
> Leah
>