You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by William Bottrell <bo...@gmail.com> on 2020/07/01 01:56:14 UTC

Re: [DISCUSS] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

Thanks, John! I made the change. How much longer should I let there be
discussion before starting a VOTE?

On Sat, Jun 27, 2020 at 6:50 AM John Roesler <vv...@apache.org> wrote:

> Thanks, Will,
>
> That looks good to me. I would only add "cached" or something
> to indicate that it wouldn't just transparently look up the current
> System.currentTimeMillis every time.
>
> For example:
> /**
>  * Returns current cached wall-clock system timestamp in milliseconds.
>  *
>  * @return the current cached wall-clock system timestamp in milliseconds
>  */
> long currentSystemTimeMs();
>
> I don't want to give specific information about _when_ exactly the
> timestamp cache will be updated, so that we can adjust it in the
> future, but it does seem important to make people aware that they
> won't see the timestamp advance during the execution of
> Processor.process(), for example.
>
> With that modification, I'll be +1 on this proposal.
>
> Thanks again for the KIP!
> -John
>
> On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:
> > Thanks, John! I appreciate you adjusting my lingo. I made the change to
> the
> > KIP. I will add the note about system time to the javadoc.
> >
> > On Wed, Jun 24, 2020 at 6:52 PM John Roesler <vv...@apache.org>
> wrote:
> >
> > > Hi Will,
> > >
> > > This proposal looks good to me overall. Thanks for the contribution!
> > >
> > > Just a couple of minor notes:
> > >
> > > The system time method would return a cached timestamp that Streams
> looks
> > > up once when it starts processing a record. This may be confusing, so
> it
> > > might be good to state it in the javadoc.
> > >
> > > I thought the javadoc for the stream time might be a bit confusing. We
> > > normally talk about “Tasks” not “partition groups” in the public api.
> Maybe
> > > just saying that it’s “the maximum timestamp of any record yet
> processed by
> > > the task” would be both high level and accurate.
> > >
> > > Thanks again!
> > > -John
> > >
> > > On Mon, Jun 22, 2020, at 02:10, William Bottrell wrote:
> > > > Thanks, Bruno. I updated the KIP, so hopefully it makes more sense.
> > > Thanks
> > > > to Matthias J. Sax and Piotr Smolinski for helping with details.
> > > >
> > > > I welcome more feedback. Let me know if something doesn't make sense
> or I
> > > > need to provide more detail. Also, feel free to enlighten me. Thanks!
> > > >
> > > > On Thu, Jun 11, 2020 at 1:11 PM Bruno Cadonna <br...@confluent.io>
> > > wrote:
> > > >
> > > > > Hi Will,
> > > > >
> > > > > Thank you for the KIP.
> > > > >
> > > > > 1. Could you elaborate a bit more on the motivation in the KIP? An
> > > > > example would make the motivation clearer.
> > > > >
> > > > > 2. In section "Proposed Changes" you do not need to show the
> > > > > implementation and describe internals. A description of the
> expected
> > > > > behavior of the newly added methods should suffice.
> > > > >
> > > > > 3. In "Compatibility, Deprecation, and Migration Plan" you should
> > > > > state that the change is backward compatible because the two
> methods
> > > > > will be added and no other method will be changed or removed.
> > > > >
> > > > > Best,
> > > > > Bruno
> > > > >
> > > > > On Wed, Jun 10, 2020 at 10:06 AM William Bottrell <
> bottrellw@gmail.com
> > > >
> > > > > wrote:
> > > > > >
> > > > > > Add currentSystemTimeMs and currentStreamTimeMs to
> ProcessorContext
> > > > > > <
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext
> > > > > >
> > > > > >
> > > > > > I am extremely new to Kafka, but thank you to John Roesler and
> > > Matthias
> > > > > J.
> > > > > > Sax for pointing me in the right direction. I accept any and all
> > > > > feedback.
> > > > > >
> > > > > > Thanks,
> > > > > > Will
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

Posted by John Roesler <vv...@apache.org>.
Thanks for the reply, Matthias,

I see what you mean. I suppose I was thinking that we would pass in the cached system time, which is also what we’re proposing to add to the ProcessorContext.

If you think there’s something about the timestamp extractor in particular that would make people want more precision, then something like Time would do the trick. Since it’s not a public API, maybe just ‘Supplier<Long>’ would be appropriate.

But I also don’t want to bikeshed it. My only concern was that it’s awkward to ask people to actually change their application code for testing. But maybe in this case, an option is better than no option, and if people don’t like it, we can always deprecate the mock extractor and evolve the interface later. 

So, I’m +1 either way.

Thanks,
John

On Mon, Aug 3, 2020, at 16:28, Matthias J. Sax wrote:
> Interesting proposal.
> 
> However, it raises the question how the runtime would pass in the
> `systemTime` parameter? To be accurate, we would need to call
> `Time.milliseconds()` each time before we call the timestamp extractor.
> This sound expensive and maybe the extractor does not even use this value.
> 
> Or we only call `Time.milliseconds()` periodically (as we also do in our
> runtime code) to make it cheap, however, we loose precision? Not sure if
> we can make this trade-off for the user?
> 
> Handing in the `Time` object itself might be another idea, however it
> seems "dangerous" though, as it does not seem to be actually public API?
> 
> Last, do we really think we need this feature? We never had a feature
> request for it and I am not aware of any issue with the current
> TimestampExtractor interface.
> 
> It's always easier to add it later if there is real demand instead of
> pro-actively changing it (and maybe the need to deprecate and remove
> later) with no clear benefit? Adding the `MockTimestampsExtractor` as
> part of the test-utils package seems less "dangerous" and should do the
> job, allowing us to collect feedback. If it's not good enough, we can
> still change the TimestampExtractor interface as a follow up?
> 
> 
> -Matthias
> 
> On 7/28/20 10:03 AM, John Roesler wrote:
> > Thanks Matthias,
> > 
> > This is a really good point. It might be a bummer
> > to have to actually change the topology between
> > testing and production. Do you think we can rather
> > evolve the TimestampExtractor interface to let
> > Streams pass in the current system time, along with
> > the current record and the current partition time?
> > 
> > For example, we could add a new method:
> > long extract(
> >   ConsumerRecord<Object, Object> record, 
> >   long partitionTime,
> >   long systemTime
> > );
> > 
> > Then, Streams could pass in the current system 
> > time and TopologyTestDriver could pass the mocked
> > time. Additionally, users who implement
> > TimestampExtractor on their own would be able to
> > deterministically unit-test their own implementation.
> > 
> > It's always a challenge to add to an interface without
> > breaking compatibility. In this case, it seems like
> > we could provide a default implementation that just
> > ignores the systemTime argument and calls
> > extract(record,  partitionTime) and also deprecate
> > the existing method. Then custom implementations
> > would get a deprecation warning telling them to
> > implement the other method, and when we remove
> > the deprecated extract(record, partitionTime), we can
> > also drop the default implementation from the new
> > method.
> > 
> > Specifically, what do you think about:
> > =================================
> > public interface TimestampExtractor {
> >     /*...
> >      * @deprecated Since 2.7 Implement
> >      *   {@code extract(ConsumerRecord<Object, Object> record, long partitionTime, long systemTime)} instead
> >      */
> >     @Deprecated
> >     long extract(
> >       ConsumerRecord<Object, Object> record,
> >       long partitionTime
> >     );
> > 
> >     default long extract(
> >       ConsumerRecord<Object, Object> record,
> >       long partitionTime,
> >       long systemTime) {
> >         return extract(record, partitionTime);
> >     }
> > }
> > =================================
> > 
> > Thanks,
> > -John
> > 
> > On Sun, Jul 26, 2020, at 15:47, Matthias J. Sax wrote:
> >> Hi,
> >>
> >> I just had one more thought about an additional improvement we might
> >> want to include in this KIP.
> >>
> >> Kafka Streams ships with a `WallclockTimestampExtractor` that just
> >> returns `System.currentTimeMillis()` and thus, cannot be mocked. And it
> >> seems that there is no way for a user to build a custom timestamps
> >> extractor that returns the TTD's mocked system time.
> >>
> >> Thus, it might be nice, to add a `MockTimeExtractor` (only in the
> >> test-util package) that users could set and this `MockTimeExtractor`
> >> returns the mocked system time.
> >>
> >> Thoughts?
> >>
> >>
> >> -Matthias
> >>
> >> On 7/7/20 11:11 PM, Matthias J. Sax wrote:
> >>> I think, we don't need a default implementation for the new methods.
> >>>
> >>> What would be the use-case to implement the  `ProcessorContext`
> >>> interface? In contract to, for example, `KeyValueStore`,
> >>> `ProcessorContext` is a use-only interface because it's never passed
> >>> into Kafka Streams, but only handed out to the user.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 7/7/20 1:28 PM, William Bottrell wrote:
> >>>> Sure, I would appreciate help from Piotr creating an example.
> >>>>
> >>>> On Tue, Jul 7, 2020 at 12:03 PM Boyang Chen <re...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hey John,
> >>>>>
> >>>>> since ProcessorContext is a public API, I couldn't be sure that people
> >>>>> won't try to extend it. Without a default implementation, user code
> >>>>> compilation will break.
> >>>>>
> >>>>> William and Piotr, it seems that we haven't added any example usage of the
> >>>>> new API, could we try to address that? It should help with the motivation
> >>>>> and follow-up meta comments as John proposed.
> >>>>>
> >>>>> Boyang
> >>>>>
> >>>>> On Mon, Jul 6, 2020 at 12:04 PM Matthias J. Sax <mj...@apache.org> wrote:
> >>>>>
> >>>>>> William,
> >>>>>>
> >>>>>> thanks for the KIP. LGMT. Feel free to start a vote.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 7/4/20 10:14 AM, John Roesler wrote:
> >>>>>>> Hi Richard,
> >>>>>>>
> >>>>>>> It’s good to hear from you!
> >>>>>>>
> >>>>>>> Thanks for bringing up the wall-clock suppression feature. IIRC,
> >>>>> someone
> >>>>>> actually started a KIP discussion for it already, but I don’t think it
> >>>>> went
> >>>>>> to a vote. I don’t recall any technical impediment, just the lack of
> >>>>>> availability to finish it up. Although there is some association, it
> >>>>> would
> >>>>>> be good to keep the KIPs separate.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> John
> >>>>>>>
> >>>>>>> On Sat, Jul 4, 2020, at 10:05, Richard Yu wrote:
> >>>>>>>> Hi all,
> >>>>>>>>
> >>>>>>>> This reminds me of a previous issue I think that we were discussing.
> >>>>>>>> @John Roesler <ma...@apache.org> I think you should
> >>>>> remember
> >>>>>> this one.
> >>>>>>>>
> >>>>>>>> A while back, we were talking about having suppress operator emit
> >>>>>>>> records by wall-clock time instead of stream time.
> >>>>>>>> If we are adding this, wouldn't that make it more feasible for us to
> >>>>>>>> implement that feature for suppression?
> >>>>>>>>
> >>>>>>>> If I recall correctly, there actually had been quite a bit of user
> >>>>>>>> demand for such a feature.
> >>>>>>>> Might be good to include it in this KIP (or maybe use one of the prior
> >>>>>>>> KIPs for this feature).
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Richard
> >>>>>>>>
> >>>>>>>> On Sat, Jul 4, 2020 at 6:58 AM John Roesler <vv...@apache.org>
> >>>>>> wrote:
> >>>>>>>>> Hi all,
> >>>>>>>>>
> >>>>>>>>>  1. Thanks, Boyang, it is nice to see usage examples in KIPs like
> >>>>>> this. It helps during the discussion, and it’s also good documentation
> >>>>>> later on.
> >>>>>>>>>
> >>>>>>>>>  2. Yeah, this is a subtle point. The motivation mentions being able
> >>>>>> to control the time during tests, but to be able to make it work, the
> >>>>>> processor implementation needs a public method on ProcessorContext to get
> >>>>>> the time. Otherwise, processors would have to check the type of the
> >>>>> context
> >>>>>> and cast, depending on whether they’re running inside a test or not. In
> >>>>>> retrospect, if we’d had a usage example, this probably would have been
> >>>>>> clear.
> >>>>>>>>>
> >>>>>>>>>  3. I don’t think we expect people to have their own implementations
> >>>>>> of ProcessorContext. Since all implementations are internal, it’s really
> >>>>> an
> >>>>>> implementation detail whether we use a default method, abstract methods,
> >>>>> or
> >>>>>> concrete methods. I can’t think of an implementation that really wants to
> >>>>>> just look up the system time. In the production code path, we cache the
> >>>>>> time for performance, and in testing, we use a mock time.
> >>>>>>>>>
> >>>>>>>>>  Thanks,
> >>>>>>>>>  John
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>  On Fri, Jul 3, 2020, at 06:41, Piotr Smoliński wrote:
> >>>>>>>>>  > 1. Makes sense; let me propose something
> >>>>>>>>>  >
> >>>>>>>>>  > 2. That's not testing-only. The goal is to use the same API to
> >>>>>> access
> >>>>>>>>>  > the time
> >>>>>>>>>  > in deployment and testing environments. The major driver is
> >>>>>>>>>  > System.currentTimeMillis(),
> >>>>>>>>>  > which a) cannot be used in tests b) could go in specific cases
> >>>>> back
> >>>>>> c)
> >>>>>>>>>  > is not compatible
> >>>>>>>>>  > with punctuator call. The idea is that we could access clock using
> >>>>>>>>>  > uniform API.
> >>>>>>>>>  > For completness we should have same API for system and stream
> >>>>> time.
> >>>>>>>>>  >
> >>>>>>>>>  > 3. There aren't that many subclasses. Two important ones are
> >>>>>>>>>  > ProcessorContextImpl and
> >>>>>>>>>  > MockProcessorContext (and third one:
> >>>>>>>>>  > ForwardingDisableProcessorContext). If given
> >>>>>>>>>  > implementation does not support schedule() call, there is no
> >>>>> reason
> >>>>>> to
> >>>>>>>>>  > support clock access.
> >>>>>>>>>  > The default implementation should just throw
> >>>>>>>>>  > UnsupportedOperationException just to prevent
> >>>>>>>>>  > from compilation errors in possible subclasses.
> >>>>>>>>>  >
> >>>>>>>>>  > On 2020/07/01 02:24:43, Boyang Chen <re...@gmail.com>
> >>>>>> wrote:
> >>>>>>>>>  > > Thanks Will for the KIP. A couple questions and suggestions:
> >>>>>>>>>  > >
> >>>>>>>>>  > > 1. I think for new APIs to make most sense, we should add a
> >>>>>> minimal example
> >>>>>>>>>  > > demonstrating how it could be useful to structure unit tests w/o
> >>>>>> the new
> >>>>>>>>>  > > APIs.
> >>>>>>>>>  > > 2. If this is a testing-only feature, could we only add it
> >>>>>>>>>  > > to MockProcessorContext?
> >>>>>>>>>  > > 3. Regarding the API, since this will be added to the
> >>>>>> ProcessorContext with
> >>>>>>>>>  > > many subclasses, does it make sense to provide default
> >>>>>> implementations as
> >>>>>>>>>  > > well?
> >>>>>>>>>  > >
> >>>>>>>>>  > > Boyang
> >>>>>>>>>  > >
> >>>>>>>>>  > > On Tue, Jun 30, 2020 at 6:56 PM William Bottrell <
> >>>>>> bottrellw@gmail.com>
> >>>>>>>>>  > > wrote:
> >>>>>>>>>  > >
> >>>>>>>>>  > > > Thanks, John! I made the change. How much longer should I let
> >>>>>> there be
> >>>>>>>>>  > > > discussion before starting a VOTE?
> >>>>>>>>>  > > >
> >>>>>>>>>  > > > On Sat, Jun 27, 2020 at 6:50 AM John Roesler <
> >>>>>> vvcephei@apache.org> wrote:
> >>>>>>>>>  > > >
> >>>>>>>>>  > > > > Thanks, Will,
> >>>>>>>>>  > > > >
> >>>>>>>>>  > > > > That looks good to me. I would only add "cached" or
> >>>>> something
> >>>>>>>>>  > > > > to indicate that it wouldn't just transparently look up the
> >>>>>> current
> >>>>>>>>>  > > > > System.currentTimeMillis every time.
> >>>>>>>>>  > > > >
> >>>>>>>>>  > > > > For example:
> >>>>>>>>>  > > > > /**
> >>>>>>>>>  > > > > * Returns current cached wall-clock system timestamp in
> >>>>>> milliseconds.
> >>>>>>>>>  > > > > *
> >>>>>>>>>  > > > > * @return the current cached wall-clock system timestamp in
> >>>>>> milliseconds
> >>>>>>>>>  > > > > */
> >>>>>>>>>  > > > > long currentSystemTimeMs();
> >>>>>>>>>  > > > >
> >>>>>>>>>  > > > > I don't want to give specific information about _when_
> >>>>>> exactly the
> >>>>>>>>>  > > > > timestamp cache will be updated, so that we can adjust it in
> >>>>>> the
> >>>>>>>>>  > > > > future, but it does seem important to make people aware that
> >>>>>> they
> >>>>>>>>>  > > > > won't see the timestamp advance during the execution of
> >>>>>>>>>  > > > > Processor.process(), for example.
> >>>>>>>>>  > > > >
> >>>>>>>>>  > > > > With that modification, I'll be +1 on this proposal.
> >>>>>>>>>  > > > >
> >>>>>>>>>  > > > > Thanks again for the KIP!
> >>>>>>>>>  > > > > -John
> >>>>>>>>>  > > > >
> >>>>>>>>>  > > > > On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:
> >>>>>>>>>  > > > > > Thanks, John! I appreciate you adjusting my lingo. I made
> >>>>>> the change to
> >>>>>>>>>  > > > > the
> >>>>>>>>>  > > > > > KIP. I will add the note about system time to the javadoc.
> >>>>>>>>>  > > > > >
> >>>>>>>>>  > > > > > On Wed, Jun 24, 2020 at 6:52 PM John Roesler <
> >>>>>> vvcephei@apache.org>
> >>>>>>>>>  > > > > wrote:
> >>>>>>>>>  > > > > >
> >>>>>>>>>  > > > > > > Hi Will,
> >>>>>>>>>  > > > > > >
> >>>>>>>>>  > > > > > > This proposal looks good to me overall. Thanks for the
> >>>>>> contribution!
> >>>>>>>>>  > > > > > >
> >>>>>>>>>  > > > > > > Just a couple of minor notes:
> >>>>>>>>>  > > > > > >
> >>>>>>>>>  > > > > > > The system time method would return a cached timestamp
> >>>>>> that Streams
> >>>>>>>>>  > > > > looks
> >>>>>>>>>  > > > > > > up once when it starts processing a record. This may be
> >>>>>> confusing, so
> >>>>>>>>>  > > > > it
> >>>>>>>>>  > > > > > > might be good to state it in the javadoc.
> >>>>>>>>>  > > > > > >
> >>>>>>>>>  > > > > > > I thought the javadoc for the stream time might be a bit
> >>>>>> confusing.
> >>>>>>>>>  > > > We
> >>>>>>>>>  > > > > > > normally talk about “Tasks” not “partition groups” in
> >>>>> the
> >>>>>> public api.
> >>>>>>>>>  > > > > Maybe
> >>>>>>>>>  > > > > > > just saying that it’s “the maximum timestamp of any
> >>>>>> record yet
> >>>>>>>>>  > > > > processed by
> >>>>>>>>>  > > > > > > the task” would be both high level and accurate.
> >>>>>>>>>  > > > > > >
> >>>>>>>>>  > > > > > > Thanks again!
> >>>>>>>>>  > > > > > > -John
> >>>>>>>>>  > > > > > >
> >>>>>>>>>  > > > > > > On Mon, Jun 22, 2020, at 02:10, William Bottrell wrote:
> >>>>>>>>>  > > > > > > > Thanks, Bruno. I updated the KIP, so hopefully it
> >>>>> makes
> >>>>>> more sense.
> >>>>>>>>>  > > > > > > Thanks
> >>>>>>>>>  > > > > > > > to Matthias J. Sax and Piotr Smolinski for helping
> >>>>> with
> >>>>>> details.
> >>>>>>>>>  > > > > > > >
> >>>>>>>>>  > > > > > > > I welcome more feedback. Let me know if something
> >>>>>> doesn't make
> >>>>>>>>>  > > > sense
> >>>>>>>>>  > > > > or I
> >>>>>>>>>  > > > > > > > need to provide more detail. Also, feel free to
> >>>>>> enlighten me.
> >>>>>>>>>  > > > Thanks!
> >>>>>>>>>  > > > > > > >
> >>>>>>>>>  > > > > > > > On Thu, Jun 11, 2020 at 1:11 PM Bruno Cadonna <
> >>>>>> bruno@confluent.io>
> >>>>>>>>>  > > > > > > wrote:
> >>>>>>>>>  > > > > > > >
> >>>>>>>>>  > > > > > > > > Hi Will,
> >>>>>>>>>  > > > > > > > >
> >>>>>>>>>  > > > > > > > > Thank you for the KIP.
> >>>>>>>>>  > > > > > > > >
> >>>>>>>>>  > > > > > > > > 1. Could you elaborate a bit more on the motivation
> >>>>>> in the KIP?
> >>>>>>>>>  > > > An
> >>>>>>>>>  > > > > > > > > example would make the motivation clearer.
> >>>>>>>>>  > > > > > > > >
> >>>>>>>>>  > > > > > > > > 2. In section "Proposed Changes" you do not need to
> >>>>>> show the
> >>>>>>>>>  > > > > > > > > implementation and describe internals. A description
> >>>>>> of the
> >>>>>>>>>  > > > > expected
> >>>>>>>>>  > > > > > > > > behavior of the newly added methods should suffice.
> >>>>>>>>>  > > > > > > > >
> >>>>>>>>>  > > > > > > > > 3. In "Compatibility, Deprecation, and Migration
> >>>>>> Plan" you should
> >>>>>>>>>  > > > > > > > > state that the change is backward compatible because
> >>>>>> the two
> >>>>>>>>>  > > > > methods
> >>>>>>>>>  > > > > > > > > will be added and no other method will be changed or
> >>>>>> removed.
> >>>>>>>>>  > > > > > > > >
> >>>>>>>>>  > > > > > > > > Best,
> >>>>>>>>>  > > > > > > > > Bruno
> >>>>>>>>>  > > > > > > > >
> >>>>>>>>>  > > > > > > > > On Wed, Jun 10, 2020 at 10:06 AM William Bottrell <
> >>>>>>>>>  > > > > bottrellw@gmail.com
> >>>>>>>>>  > > > > > > >
> >>>>>>>>>  > > > > > > > > wrote:
> >>>>>>>>>  > > > > > > > > >
> >>>>>>>>>  > > > > > > > > > Add currentSystemTimeMs and currentStreamTimeMs to
> >>>>>>>>>  > > > > ProcessorContext
> >>>>>>>>>  > > > > > > > > > <
> >>>>>>>>>  > > > > > > > >
> >>>>>>>>>  > > > > > >
> >>>>>>>>>  > > > >
> >>>>>>>>>  > > >
> >>>>>>
> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext
> >>>>>>>>>  > > > > > > > > >
> >>>>>>>>>  > > > > > > > > >
> >>>>>>>>>  > > > > > > > > > I am extremely new to Kafka, but thank you to John
> >>>>>> Roesler and
> >>>>>>>>>  > > > > > > Matthias
> >>>>>>>>>  > > > > > > > > J.
> >>>>>>>>>  > > > > > > > > > Sax for pointing me in the right direction. I
> >>>>>> accept any and
> >>>>>>>>>  > > > all
> >>>>>>>>>  > > > > > > > > feedback.
> >>>>>>>>>  > > > > > > > > >
> >>>>>>>>>  > > > > > > > > > Thanks,
> >>>>>>>>>  > > > > > > > > > Will
> >>>>>>>>>  > > > > > > > >
> >>>>>>>>>  > > > > > > >
> >>>>>>>>>  > > > > > >
> >>>>>>>>>  > > > > >
> >>>>>>>>>  > > > >
> >>>>>>>>>  > > >
> >>>>>>>>>  > >
> >>>>>>>>>  >
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >>
> >> Attachments:
> >> * signature.asc
> 
> 
> Attachments:
> * signature.asc

Re: [DISCUSS] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

Posted by "Matthias J. Sax" <mj...@apache.org>.
Interesting proposal.

However, it raises the question how the runtime would pass in the
`systemTime` parameter? To be accurate, we would need to call
`Time.milliseconds()` each time before we call the timestamp extractor.
This sound expensive and maybe the extractor does not even use this value.

Or we only call `Time.milliseconds()` periodically (as we also do in our
runtime code) to make it cheap, however, we loose precision? Not sure if
we can make this trade-off for the user?

Handing in the `Time` object itself might be another idea, however it
seems "dangerous" though, as it does not seem to be actually public API?

Last, do we really think we need this feature? We never had a feature
request for it and I am not aware of any issue with the current
TimestampExtractor interface.

It's always easier to add it later if there is real demand instead of
pro-actively changing it (and maybe the need to deprecate and remove
later) with no clear benefit? Adding the `MockTimestampsExtractor` as
part of the test-utils package seems less "dangerous" and should do the
job, allowing us to collect feedback. If it's not good enough, we can
still change the TimestampExtractor interface as a follow up?


-Matthias

On 7/28/20 10:03 AM, John Roesler wrote:
> Thanks Matthias,
> 
> This is a really good point. It might be a bummer
> to have to actually change the topology between
> testing and production. Do you think we can rather
> evolve the TimestampExtractor interface to let
> Streams pass in the current system time, along with
> the current record and the current partition time?
> 
> For example, we could add a new method:
> long extract(
>   ConsumerRecord<Object, Object> record, 
>   long partitionTime,
>   long systemTime
> );
> 
> Then, Streams could pass in the current system 
> time and TopologyTestDriver could pass the mocked
> time. Additionally, users who implement
> TimestampExtractor on their own would be able to
> deterministically unit-test their own implementation.
> 
> It's always a challenge to add to an interface without
> breaking compatibility. In this case, it seems like
> we could provide a default implementation that just
> ignores the systemTime argument and calls
> extract(record,  partitionTime) and also deprecate
> the existing method. Then custom implementations
> would get a deprecation warning telling them to
> implement the other method, and when we remove
> the deprecated extract(record, partitionTime), we can
> also drop the default implementation from the new
> method.
> 
> Specifically, what do you think about:
> =================================
> public interface TimestampExtractor {
>     /*...
>      * @deprecated Since 2.7 Implement
>      *   {@code extract(ConsumerRecord<Object, Object> record, long partitionTime, long systemTime)} instead
>      */
>     @Deprecated
>     long extract(
>       ConsumerRecord<Object, Object> record,
>       long partitionTime
>     );
> 
>     default long extract(
>       ConsumerRecord<Object, Object> record,
>       long partitionTime,
>       long systemTime) {
>         return extract(record, partitionTime);
>     }
> }
> =================================
> 
> Thanks,
> -John
> 
> On Sun, Jul 26, 2020, at 15:47, Matthias J. Sax wrote:
>> Hi,
>>
>> I just had one more thought about an additional improvement we might
>> want to include in this KIP.
>>
>> Kafka Streams ships with a `WallclockTimestampExtractor` that just
>> returns `System.currentTimeMillis()` and thus, cannot be mocked. And it
>> seems that there is no way for a user to build a custom timestamps
>> extractor that returns the TTD's mocked system time.
>>
>> Thus, it might be nice, to add a `MockTimeExtractor` (only in the
>> test-util package) that users could set and this `MockTimeExtractor`
>> returns the mocked system time.
>>
>> Thoughts?
>>
>>
>> -Matthias
>>
>> On 7/7/20 11:11 PM, Matthias J. Sax wrote:
>>> I think, we don't need a default implementation for the new methods.
>>>
>>> What would be the use-case to implement the  `ProcessorContext`
>>> interface? In contract to, for example, `KeyValueStore`,
>>> `ProcessorContext` is a use-only interface because it's never passed
>>> into Kafka Streams, but only handed out to the user.
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 7/7/20 1:28 PM, William Bottrell wrote:
>>>> Sure, I would appreciate help from Piotr creating an example.
>>>>
>>>> On Tue, Jul 7, 2020 at 12:03 PM Boyang Chen <re...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hey John,
>>>>>
>>>>> since ProcessorContext is a public API, I couldn't be sure that people
>>>>> won't try to extend it. Without a default implementation, user code
>>>>> compilation will break.
>>>>>
>>>>> William and Piotr, it seems that we haven't added any example usage of the
>>>>> new API, could we try to address that? It should help with the motivation
>>>>> and follow-up meta comments as John proposed.
>>>>>
>>>>> Boyang
>>>>>
>>>>> On Mon, Jul 6, 2020 at 12:04 PM Matthias J. Sax <mj...@apache.org> wrote:
>>>>>
>>>>>> William,
>>>>>>
>>>>>> thanks for the KIP. LGMT. Feel free to start a vote.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 7/4/20 10:14 AM, John Roesler wrote:
>>>>>>> Hi Richard,
>>>>>>>
>>>>>>> It’s good to hear from you!
>>>>>>>
>>>>>>> Thanks for bringing up the wall-clock suppression feature. IIRC,
>>>>> someone
>>>>>> actually started a KIP discussion for it already, but I don’t think it
>>>>> went
>>>>>> to a vote. I don’t recall any technical impediment, just the lack of
>>>>>> availability to finish it up. Although there is some association, it
>>>>> would
>>>>>> be good to keep the KIPs separate.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> John
>>>>>>>
>>>>>>> On Sat, Jul 4, 2020, at 10:05, Richard Yu wrote:
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> This reminds me of a previous issue I think that we were discussing.
>>>>>>>> @John Roesler <ma...@apache.org> I think you should
>>>>> remember
>>>>>> this one.
>>>>>>>>
>>>>>>>> A while back, we were talking about having suppress operator emit
>>>>>>>> records by wall-clock time instead of stream time.
>>>>>>>> If we are adding this, wouldn't that make it more feasible for us to
>>>>>>>> implement that feature for suppression?
>>>>>>>>
>>>>>>>> If I recall correctly, there actually had been quite a bit of user
>>>>>>>> demand for such a feature.
>>>>>>>> Might be good to include it in this KIP (or maybe use one of the prior
>>>>>>>> KIPs for this feature).
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Richard
>>>>>>>>
>>>>>>>> On Sat, Jul 4, 2020 at 6:58 AM John Roesler <vv...@apache.org>
>>>>>> wrote:
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>>  1. Thanks, Boyang, it is nice to see usage examples in KIPs like
>>>>>> this. It helps during the discussion, and it’s also good documentation
>>>>>> later on.
>>>>>>>>>
>>>>>>>>>  2. Yeah, this is a subtle point. The motivation mentions being able
>>>>>> to control the time during tests, but to be able to make it work, the
>>>>>> processor implementation needs a public method on ProcessorContext to get
>>>>>> the time. Otherwise, processors would have to check the type of the
>>>>> context
>>>>>> and cast, depending on whether they’re running inside a test or not. In
>>>>>> retrospect, if we’d had a usage example, this probably would have been
>>>>>> clear.
>>>>>>>>>
>>>>>>>>>  3. I don’t think we expect people to have their own implementations
>>>>>> of ProcessorContext. Since all implementations are internal, it’s really
>>>>> an
>>>>>> implementation detail whether we use a default method, abstract methods,
>>>>> or
>>>>>> concrete methods. I can’t think of an implementation that really wants to
>>>>>> just look up the system time. In the production code path, we cache the
>>>>>> time for performance, and in testing, we use a mock time.
>>>>>>>>>
>>>>>>>>>  Thanks,
>>>>>>>>>  John
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>  On Fri, Jul 3, 2020, at 06:41, Piotr Smoliński wrote:
>>>>>>>>>  > 1. Makes sense; let me propose something
>>>>>>>>>  >
>>>>>>>>>  > 2. That's not testing-only. The goal is to use the same API to
>>>>>> access
>>>>>>>>>  > the time
>>>>>>>>>  > in deployment and testing environments. The major driver is
>>>>>>>>>  > System.currentTimeMillis(),
>>>>>>>>>  > which a) cannot be used in tests b) could go in specific cases
>>>>> back
>>>>>> c)
>>>>>>>>>  > is not compatible
>>>>>>>>>  > with punctuator call. The idea is that we could access clock using
>>>>>>>>>  > uniform API.
>>>>>>>>>  > For completness we should have same API for system and stream
>>>>> time.
>>>>>>>>>  >
>>>>>>>>>  > 3. There aren't that many subclasses. Two important ones are
>>>>>>>>>  > ProcessorContextImpl and
>>>>>>>>>  > MockProcessorContext (and third one:
>>>>>>>>>  > ForwardingDisableProcessorContext). If given
>>>>>>>>>  > implementation does not support schedule() call, there is no
>>>>> reason
>>>>>> to
>>>>>>>>>  > support clock access.
>>>>>>>>>  > The default implementation should just throw
>>>>>>>>>  > UnsupportedOperationException just to prevent
>>>>>>>>>  > from compilation errors in possible subclasses.
>>>>>>>>>  >
>>>>>>>>>  > On 2020/07/01 02:24:43, Boyang Chen <re...@gmail.com>
>>>>>> wrote:
>>>>>>>>>  > > Thanks Will for the KIP. A couple questions and suggestions:
>>>>>>>>>  > >
>>>>>>>>>  > > 1. I think for new APIs to make most sense, we should add a
>>>>>> minimal example
>>>>>>>>>  > > demonstrating how it could be useful to structure unit tests w/o
>>>>>> the new
>>>>>>>>>  > > APIs.
>>>>>>>>>  > > 2. If this is a testing-only feature, could we only add it
>>>>>>>>>  > > to MockProcessorContext?
>>>>>>>>>  > > 3. Regarding the API, since this will be added to the
>>>>>> ProcessorContext with
>>>>>>>>>  > > many subclasses, does it make sense to provide default
>>>>>> implementations as
>>>>>>>>>  > > well?
>>>>>>>>>  > >
>>>>>>>>>  > > Boyang
>>>>>>>>>  > >
>>>>>>>>>  > > On Tue, Jun 30, 2020 at 6:56 PM William Bottrell <
>>>>>> bottrellw@gmail.com>
>>>>>>>>>  > > wrote:
>>>>>>>>>  > >
>>>>>>>>>  > > > Thanks, John! I made the change. How much longer should I let
>>>>>> there be
>>>>>>>>>  > > > discussion before starting a VOTE?
>>>>>>>>>  > > >
>>>>>>>>>  > > > On Sat, Jun 27, 2020 at 6:50 AM John Roesler <
>>>>>> vvcephei@apache.org> wrote:
>>>>>>>>>  > > >
>>>>>>>>>  > > > > Thanks, Will,
>>>>>>>>>  > > > >
>>>>>>>>>  > > > > That looks good to me. I would only add "cached" or
>>>>> something
>>>>>>>>>  > > > > to indicate that it wouldn't just transparently look up the
>>>>>> current
>>>>>>>>>  > > > > System.currentTimeMillis every time.
>>>>>>>>>  > > > >
>>>>>>>>>  > > > > For example:
>>>>>>>>>  > > > > /**
>>>>>>>>>  > > > > * Returns current cached wall-clock system timestamp in
>>>>>> milliseconds.
>>>>>>>>>  > > > > *
>>>>>>>>>  > > > > * @return the current cached wall-clock system timestamp in
>>>>>> milliseconds
>>>>>>>>>  > > > > */
>>>>>>>>>  > > > > long currentSystemTimeMs();
>>>>>>>>>  > > > >
>>>>>>>>>  > > > > I don't want to give specific information about _when_
>>>>>> exactly the
>>>>>>>>>  > > > > timestamp cache will be updated, so that we can adjust it in
>>>>>> the
>>>>>>>>>  > > > > future, but it does seem important to make people aware that
>>>>>> they
>>>>>>>>>  > > > > won't see the timestamp advance during the execution of
>>>>>>>>>  > > > > Processor.process(), for example.
>>>>>>>>>  > > > >
>>>>>>>>>  > > > > With that modification, I'll be +1 on this proposal.
>>>>>>>>>  > > > >
>>>>>>>>>  > > > > Thanks again for the KIP!
>>>>>>>>>  > > > > -John
>>>>>>>>>  > > > >
>>>>>>>>>  > > > > On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:
>>>>>>>>>  > > > > > Thanks, John! I appreciate you adjusting my lingo. I made
>>>>>> the change to
>>>>>>>>>  > > > > the
>>>>>>>>>  > > > > > KIP. I will add the note about system time to the javadoc.
>>>>>>>>>  > > > > >
>>>>>>>>>  > > > > > On Wed, Jun 24, 2020 at 6:52 PM John Roesler <
>>>>>> vvcephei@apache.org>
>>>>>>>>>  > > > > wrote:
>>>>>>>>>  > > > > >
>>>>>>>>>  > > > > > > Hi Will,
>>>>>>>>>  > > > > > >
>>>>>>>>>  > > > > > > This proposal looks good to me overall. Thanks for the
>>>>>> contribution!
>>>>>>>>>  > > > > > >
>>>>>>>>>  > > > > > > Just a couple of minor notes:
>>>>>>>>>  > > > > > >
>>>>>>>>>  > > > > > > The system time method would return a cached timestamp
>>>>>> that Streams
>>>>>>>>>  > > > > looks
>>>>>>>>>  > > > > > > up once when it starts processing a record. This may be
>>>>>> confusing, so
>>>>>>>>>  > > > > it
>>>>>>>>>  > > > > > > might be good to state it in the javadoc.
>>>>>>>>>  > > > > > >
>>>>>>>>>  > > > > > > I thought the javadoc for the stream time might be a bit
>>>>>> confusing.
>>>>>>>>>  > > > We
>>>>>>>>>  > > > > > > normally talk about “Tasks” not “partition groups” in
>>>>> the
>>>>>> public api.
>>>>>>>>>  > > > > Maybe
>>>>>>>>>  > > > > > > just saying that it’s “the maximum timestamp of any
>>>>>> record yet
>>>>>>>>>  > > > > processed by
>>>>>>>>>  > > > > > > the task” would be both high level and accurate.
>>>>>>>>>  > > > > > >
>>>>>>>>>  > > > > > > Thanks again!
>>>>>>>>>  > > > > > > -John
>>>>>>>>>  > > > > > >
>>>>>>>>>  > > > > > > On Mon, Jun 22, 2020, at 02:10, William Bottrell wrote:
>>>>>>>>>  > > > > > > > Thanks, Bruno. I updated the KIP, so hopefully it
>>>>> makes
>>>>>> more sense.
>>>>>>>>>  > > > > > > Thanks
>>>>>>>>>  > > > > > > > to Matthias J. Sax and Piotr Smolinski for helping
>>>>> with
>>>>>> details.
>>>>>>>>>  > > > > > > >
>>>>>>>>>  > > > > > > > I welcome more feedback. Let me know if something
>>>>>> doesn't make
>>>>>>>>>  > > > sense
>>>>>>>>>  > > > > or I
>>>>>>>>>  > > > > > > > need to provide more detail. Also, feel free to
>>>>>> enlighten me.
>>>>>>>>>  > > > Thanks!
>>>>>>>>>  > > > > > > >
>>>>>>>>>  > > > > > > > On Thu, Jun 11, 2020 at 1:11 PM Bruno Cadonna <
>>>>>> bruno@confluent.io>
>>>>>>>>>  > > > > > > wrote:
>>>>>>>>>  > > > > > > >
>>>>>>>>>  > > > > > > > > Hi Will,
>>>>>>>>>  > > > > > > > >
>>>>>>>>>  > > > > > > > > Thank you for the KIP.
>>>>>>>>>  > > > > > > > >
>>>>>>>>>  > > > > > > > > 1. Could you elaborate a bit more on the motivation
>>>>>> in the KIP?
>>>>>>>>>  > > > An
>>>>>>>>>  > > > > > > > > example would make the motivation clearer.
>>>>>>>>>  > > > > > > > >
>>>>>>>>>  > > > > > > > > 2. In section "Proposed Changes" you do not need to
>>>>>> show the
>>>>>>>>>  > > > > > > > > implementation and describe internals. A description
>>>>>> of the
>>>>>>>>>  > > > > expected
>>>>>>>>>  > > > > > > > > behavior of the newly added methods should suffice.
>>>>>>>>>  > > > > > > > >
>>>>>>>>>  > > > > > > > > 3. In "Compatibility, Deprecation, and Migration
>>>>>> Plan" you should
>>>>>>>>>  > > > > > > > > state that the change is backward compatible because
>>>>>> the two
>>>>>>>>>  > > > > methods
>>>>>>>>>  > > > > > > > > will be added and no other method will be changed or
>>>>>> removed.
>>>>>>>>>  > > > > > > > >
>>>>>>>>>  > > > > > > > > Best,
>>>>>>>>>  > > > > > > > > Bruno
>>>>>>>>>  > > > > > > > >
>>>>>>>>>  > > > > > > > > On Wed, Jun 10, 2020 at 10:06 AM William Bottrell <
>>>>>>>>>  > > > > bottrellw@gmail.com
>>>>>>>>>  > > > > > > >
>>>>>>>>>  > > > > > > > > wrote:
>>>>>>>>>  > > > > > > > > >
>>>>>>>>>  > > > > > > > > > Add currentSystemTimeMs and currentStreamTimeMs to
>>>>>>>>>  > > > > ProcessorContext
>>>>>>>>>  > > > > > > > > > <
>>>>>>>>>  > > > > > > > >
>>>>>>>>>  > > > > > >
>>>>>>>>>  > > > >
>>>>>>>>>  > > >
>>>>>>
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext
>>>>>>>>>  > > > > > > > > >
>>>>>>>>>  > > > > > > > > >
>>>>>>>>>  > > > > > > > > > I am extremely new to Kafka, but thank you to John
>>>>>> Roesler and
>>>>>>>>>  > > > > > > Matthias
>>>>>>>>>  > > > > > > > > J.
>>>>>>>>>  > > > > > > > > > Sax for pointing me in the right direction. I
>>>>>> accept any and
>>>>>>>>>  > > > all
>>>>>>>>>  > > > > > > > > feedback.
>>>>>>>>>  > > > > > > > > >
>>>>>>>>>  > > > > > > > > > Thanks,
>>>>>>>>>  > > > > > > > > > Will
>>>>>>>>>  > > > > > > > >
>>>>>>>>>  > > > > > > >
>>>>>>>>>  > > > > > >
>>>>>>>>>  > > > > >
>>>>>>>>>  > > > >
>>>>>>>>>  > > >
>>>>>>>>>  > >
>>>>>>>>>  >
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>> Attachments:
>> * signature.asc


Re: [DISCUSS] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

Posted by John Roesler <vv...@apache.org>.
Thanks Matthias,

This is a really good point. It might be a bummer
to have to actually change the topology between
testing and production. Do you think we can rather
evolve the TimestampExtractor interface to let
Streams pass in the current system time, along with
the current record and the current partition time?

For example, we could add a new method:
long extract(
  ConsumerRecord<Object, Object> record, 
  long partitionTime,
  long systemTime
);

Then, Streams could pass in the current system 
time and TopologyTestDriver could pass the mocked
time. Additionally, users who implement
TimestampExtractor on their own would be able to
deterministically unit-test their own implementation.

It's always a challenge to add to an interface without
breaking compatibility. In this case, it seems like
we could provide a default implementation that just
ignores the systemTime argument and calls
extract(record,  partitionTime) and also deprecate
the existing method. Then custom implementations
would get a deprecation warning telling them to
implement the other method, and when we remove
the deprecated extract(record, partitionTime), we can
also drop the default implementation from the new
method.

Specifically, what do you think about:
=================================
public interface TimestampExtractor {
    /*...
     * @deprecated Since 2.7 Implement
     *   {@code extract(ConsumerRecord<Object, Object> record, long partitionTime, long systemTime)} instead
     */
    @Deprecated
    long extract(
      ConsumerRecord<Object, Object> record,
      long partitionTime
    );

    default long extract(
      ConsumerRecord<Object, Object> record,
      long partitionTime,
      long systemTime) {
        return extract(record, partitionTime);
    }
}
=================================

Thanks,
-John

On Sun, Jul 26, 2020, at 15:47, Matthias J. Sax wrote:
> Hi,
> 
> I just had one more thought about an additional improvement we might
> want to include in this KIP.
> 
> Kafka Streams ships with a `WallclockTimestampExtractor` that just
> returns `System.currentTimeMillis()` and thus, cannot be mocked. And it
> seems that there is no way for a user to build a custom timestamps
> extractor that returns the TTD's mocked system time.
> 
> Thus, it might be nice, to add a `MockTimeExtractor` (only in the
> test-util package) that users could set and this `MockTimeExtractor`
> returns the mocked system time.
> 
> Thoughts?
> 
> 
> -Matthias
> 
> On 7/7/20 11:11 PM, Matthias J. Sax wrote:
> > I think, we don't need a default implementation for the new methods.
> > 
> > What would be the use-case to implement the  `ProcessorContext`
> > interface? In contract to, for example, `KeyValueStore`,
> > `ProcessorContext` is a use-only interface because it's never passed
> > into Kafka Streams, but only handed out to the user.
> > 
> > 
> > -Matthias
> > 
> > 
> > On 7/7/20 1:28 PM, William Bottrell wrote:
> >> Sure, I would appreciate help from Piotr creating an example.
> >>
> >> On Tue, Jul 7, 2020 at 12:03 PM Boyang Chen <re...@gmail.com>
> >> wrote:
> >>
> >>> Hey John,
> >>>
> >>> since ProcessorContext is a public API, I couldn't be sure that people
> >>> won't try to extend it. Without a default implementation, user code
> >>> compilation will break.
> >>>
> >>> William and Piotr, it seems that we haven't added any example usage of the
> >>> new API, could we try to address that? It should help with the motivation
> >>> and follow-up meta comments as John proposed.
> >>>
> >>> Boyang
> >>>
> >>> On Mon, Jul 6, 2020 at 12:04 PM Matthias J. Sax <mj...@apache.org> wrote:
> >>>
> >>>> William,
> >>>>
> >>>> thanks for the KIP. LGMT. Feel free to start a vote.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 7/4/20 10:14 AM, John Roesler wrote:
> >>>>> Hi Richard,
> >>>>>
> >>>>> It’s good to hear from you!
> >>>>>
> >>>>> Thanks for bringing up the wall-clock suppression feature. IIRC,
> >>> someone
> >>>> actually started a KIP discussion for it already, but I don’t think it
> >>> went
> >>>> to a vote. I don’t recall any technical impediment, just the lack of
> >>>> availability to finish it up. Although there is some association, it
> >>> would
> >>>> be good to keep the KIPs separate.
> >>>>>
> >>>>> Thanks,
> >>>>> John
> >>>>>
> >>>>> On Sat, Jul 4, 2020, at 10:05, Richard Yu wrote:
> >>>>>> Hi all,
> >>>>>>
> >>>>>> This reminds me of a previous issue I think that we were discussing.
> >>>>>> @John Roesler <ma...@apache.org> I think you should
> >>> remember
> >>>> this one.
> >>>>>>
> >>>>>> A while back, we were talking about having suppress operator emit
> >>>>>> records by wall-clock time instead of stream time.
> >>>>>> If we are adding this, wouldn't that make it more feasible for us to
> >>>>>> implement that feature for suppression?
> >>>>>>
> >>>>>> If I recall correctly, there actually had been quite a bit of user
> >>>>>> demand for such a feature.
> >>>>>> Might be good to include it in this KIP (or maybe use one of the prior
> >>>>>> KIPs for this feature).
> >>>>>>
> >>>>>> Best,
> >>>>>> Richard
> >>>>>>
> >>>>>> On Sat, Jul 4, 2020 at 6:58 AM John Roesler <vv...@apache.org>
> >>>> wrote:
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>>  1. Thanks, Boyang, it is nice to see usage examples in KIPs like
> >>>> this. It helps during the discussion, and it’s also good documentation
> >>>> later on.
> >>>>>>>
> >>>>>>>  2. Yeah, this is a subtle point. The motivation mentions being able
> >>>> to control the time during tests, but to be able to make it work, the
> >>>> processor implementation needs a public method on ProcessorContext to get
> >>>> the time. Otherwise, processors would have to check the type of the
> >>> context
> >>>> and cast, depending on whether they’re running inside a test or not. In
> >>>> retrospect, if we’d had a usage example, this probably would have been
> >>>> clear.
> >>>>>>>
> >>>>>>>  3. I don’t think we expect people to have their own implementations
> >>>> of ProcessorContext. Since all implementations are internal, it’s really
> >>> an
> >>>> implementation detail whether we use a default method, abstract methods,
> >>> or
> >>>> concrete methods. I can’t think of an implementation that really wants to
> >>>> just look up the system time. In the production code path, we cache the
> >>>> time for performance, and in testing, we use a mock time.
> >>>>>>>
> >>>>>>>  Thanks,
> >>>>>>>  John
> >>>>>>>
> >>>>>>>
> >>>>>>>  On Fri, Jul 3, 2020, at 06:41, Piotr Smoliński wrote:
> >>>>>>>  > 1. Makes sense; let me propose something
> >>>>>>>  >
> >>>>>>>  > 2. That's not testing-only. The goal is to use the same API to
> >>>> access
> >>>>>>>  > the time
> >>>>>>>  > in deployment and testing environments. The major driver is
> >>>>>>>  > System.currentTimeMillis(),
> >>>>>>>  > which a) cannot be used in tests b) could go in specific cases
> >>> back
> >>>> c)
> >>>>>>>  > is not compatible
> >>>>>>>  > with punctuator call. The idea is that we could access clock using
> >>>>>>>  > uniform API.
> >>>>>>>  > For completness we should have same API for system and stream
> >>> time.
> >>>>>>>  >
> >>>>>>>  > 3. There aren't that many subclasses. Two important ones are
> >>>>>>>  > ProcessorContextImpl and
> >>>>>>>  > MockProcessorContext (and third one:
> >>>>>>>  > ForwardingDisableProcessorContext). If given
> >>>>>>>  > implementation does not support schedule() call, there is no
> >>> reason
> >>>> to
> >>>>>>>  > support clock access.
> >>>>>>>  > The default implementation should just throw
> >>>>>>>  > UnsupportedOperationException just to prevent
> >>>>>>>  > from compilation errors in possible subclasses.
> >>>>>>>  >
> >>>>>>>  > On 2020/07/01 02:24:43, Boyang Chen <re...@gmail.com>
> >>>> wrote:
> >>>>>>>  > > Thanks Will for the KIP. A couple questions and suggestions:
> >>>>>>>  > >
> >>>>>>>  > > 1. I think for new APIs to make most sense, we should add a
> >>>> minimal example
> >>>>>>>  > > demonstrating how it could be useful to structure unit tests w/o
> >>>> the new
> >>>>>>>  > > APIs.
> >>>>>>>  > > 2. If this is a testing-only feature, could we only add it
> >>>>>>>  > > to MockProcessorContext?
> >>>>>>>  > > 3. Regarding the API, since this will be added to the
> >>>> ProcessorContext with
> >>>>>>>  > > many subclasses, does it make sense to provide default
> >>>> implementations as
> >>>>>>>  > > well?
> >>>>>>>  > >
> >>>>>>>  > > Boyang
> >>>>>>>  > >
> >>>>>>>  > > On Tue, Jun 30, 2020 at 6:56 PM William Bottrell <
> >>>> bottrellw@gmail.com>
> >>>>>>>  > > wrote:
> >>>>>>>  > >
> >>>>>>>  > > > Thanks, John! I made the change. How much longer should I let
> >>>> there be
> >>>>>>>  > > > discussion before starting a VOTE?
> >>>>>>>  > > >
> >>>>>>>  > > > On Sat, Jun 27, 2020 at 6:50 AM John Roesler <
> >>>> vvcephei@apache.org> wrote:
> >>>>>>>  > > >
> >>>>>>>  > > > > Thanks, Will,
> >>>>>>>  > > > >
> >>>>>>>  > > > > That looks good to me. I would only add "cached" or
> >>> something
> >>>>>>>  > > > > to indicate that it wouldn't just transparently look up the
> >>>> current
> >>>>>>>  > > > > System.currentTimeMillis every time.
> >>>>>>>  > > > >
> >>>>>>>  > > > > For example:
> >>>>>>>  > > > > /**
> >>>>>>>  > > > > * Returns current cached wall-clock system timestamp in
> >>>> milliseconds.
> >>>>>>>  > > > > *
> >>>>>>>  > > > > * @return the current cached wall-clock system timestamp in
> >>>> milliseconds
> >>>>>>>  > > > > */
> >>>>>>>  > > > > long currentSystemTimeMs();
> >>>>>>>  > > > >
> >>>>>>>  > > > > I don't want to give specific information about _when_
> >>>> exactly the
> >>>>>>>  > > > > timestamp cache will be updated, so that we can adjust it in
> >>>> the
> >>>>>>>  > > > > future, but it does seem important to make people aware that
> >>>> they
> >>>>>>>  > > > > won't see the timestamp advance during the execution of
> >>>>>>>  > > > > Processor.process(), for example.
> >>>>>>>  > > > >
> >>>>>>>  > > > > With that modification, I'll be +1 on this proposal.
> >>>>>>>  > > > >
> >>>>>>>  > > > > Thanks again for the KIP!
> >>>>>>>  > > > > -John
> >>>>>>>  > > > >
> >>>>>>>  > > > > On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:
> >>>>>>>  > > > > > Thanks, John! I appreciate you adjusting my lingo. I made
> >>>> the change to
> >>>>>>>  > > > > the
> >>>>>>>  > > > > > KIP. I will add the note about system time to the javadoc.
> >>>>>>>  > > > > >
> >>>>>>>  > > > > > On Wed, Jun 24, 2020 at 6:52 PM John Roesler <
> >>>> vvcephei@apache.org>
> >>>>>>>  > > > > wrote:
> >>>>>>>  > > > > >
> >>>>>>>  > > > > > > Hi Will,
> >>>>>>>  > > > > > >
> >>>>>>>  > > > > > > This proposal looks good to me overall. Thanks for the
> >>>> contribution!
> >>>>>>>  > > > > > >
> >>>>>>>  > > > > > > Just a couple of minor notes:
> >>>>>>>  > > > > > >
> >>>>>>>  > > > > > > The system time method would return a cached timestamp
> >>>> that Streams
> >>>>>>>  > > > > looks
> >>>>>>>  > > > > > > up once when it starts processing a record. This may be
> >>>> confusing, so
> >>>>>>>  > > > > it
> >>>>>>>  > > > > > > might be good to state it in the javadoc.
> >>>>>>>  > > > > > >
> >>>>>>>  > > > > > > I thought the javadoc for the stream time might be a bit
> >>>> confusing.
> >>>>>>>  > > > We
> >>>>>>>  > > > > > > normally talk about “Tasks” not “partition groups” in
> >>> the
> >>>> public api.
> >>>>>>>  > > > > Maybe
> >>>>>>>  > > > > > > just saying that it’s “the maximum timestamp of any
> >>>> record yet
> >>>>>>>  > > > > processed by
> >>>>>>>  > > > > > > the task” would be both high level and accurate.
> >>>>>>>  > > > > > >
> >>>>>>>  > > > > > > Thanks again!
> >>>>>>>  > > > > > > -John
> >>>>>>>  > > > > > >
> >>>>>>>  > > > > > > On Mon, Jun 22, 2020, at 02:10, William Bottrell wrote:
> >>>>>>>  > > > > > > > Thanks, Bruno. I updated the KIP, so hopefully it
> >>> makes
> >>>> more sense.
> >>>>>>>  > > > > > > Thanks
> >>>>>>>  > > > > > > > to Matthias J. Sax and Piotr Smolinski for helping
> >>> with
> >>>> details.
> >>>>>>>  > > > > > > >
> >>>>>>>  > > > > > > > I welcome more feedback. Let me know if something
> >>>> doesn't make
> >>>>>>>  > > > sense
> >>>>>>>  > > > > or I
> >>>>>>>  > > > > > > > need to provide more detail. Also, feel free to
> >>>> enlighten me.
> >>>>>>>  > > > Thanks!
> >>>>>>>  > > > > > > >
> >>>>>>>  > > > > > > > On Thu, Jun 11, 2020 at 1:11 PM Bruno Cadonna <
> >>>> bruno@confluent.io>
> >>>>>>>  > > > > > > wrote:
> >>>>>>>  > > > > > > >
> >>>>>>>  > > > > > > > > Hi Will,
> >>>>>>>  > > > > > > > >
> >>>>>>>  > > > > > > > > Thank you for the KIP.
> >>>>>>>  > > > > > > > >
> >>>>>>>  > > > > > > > > 1. Could you elaborate a bit more on the motivation
> >>>> in the KIP?
> >>>>>>>  > > > An
> >>>>>>>  > > > > > > > > example would make the motivation clearer.
> >>>>>>>  > > > > > > > >
> >>>>>>>  > > > > > > > > 2. In section "Proposed Changes" you do not need to
> >>>> show the
> >>>>>>>  > > > > > > > > implementation and describe internals. A description
> >>>> of the
> >>>>>>>  > > > > expected
> >>>>>>>  > > > > > > > > behavior of the newly added methods should suffice.
> >>>>>>>  > > > > > > > >
> >>>>>>>  > > > > > > > > 3. In "Compatibility, Deprecation, and Migration
> >>>> Plan" you should
> >>>>>>>  > > > > > > > > state that the change is backward compatible because
> >>>> the two
> >>>>>>>  > > > > methods
> >>>>>>>  > > > > > > > > will be added and no other method will be changed or
> >>>> removed.
> >>>>>>>  > > > > > > > >
> >>>>>>>  > > > > > > > > Best,
> >>>>>>>  > > > > > > > > Bruno
> >>>>>>>  > > > > > > > >
> >>>>>>>  > > > > > > > > On Wed, Jun 10, 2020 at 10:06 AM William Bottrell <
> >>>>>>>  > > > > bottrellw@gmail.com
> >>>>>>>  > > > > > > >
> >>>>>>>  > > > > > > > > wrote:
> >>>>>>>  > > > > > > > > >
> >>>>>>>  > > > > > > > > > Add currentSystemTimeMs and currentStreamTimeMs to
> >>>>>>>  > > > > ProcessorContext
> >>>>>>>  > > > > > > > > > <
> >>>>>>>  > > > > > > > >
> >>>>>>>  > > > > > >
> >>>>>>>  > > > >
> >>>>>>>  > > >
> >>>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext
> >>>>>>>  > > > > > > > > >
> >>>>>>>  > > > > > > > > >
> >>>>>>>  > > > > > > > > > I am extremely new to Kafka, but thank you to John
> >>>> Roesler and
> >>>>>>>  > > > > > > Matthias
> >>>>>>>  > > > > > > > > J.
> >>>>>>>  > > > > > > > > > Sax for pointing me in the right direction. I
> >>>> accept any and
> >>>>>>>  > > > all
> >>>>>>>  > > > > > > > > feedback.
> >>>>>>>  > > > > > > > > >
> >>>>>>>  > > > > > > > > > Thanks,
> >>>>>>>  > > > > > > > > > Will
> >>>>>>>  > > > > > > > >
> >>>>>>>  > > > > > > >
> >>>>>>>  > > > > > >
> >>>>>>>  > > > > >
> >>>>>>>  > > > >
> >>>>>>>  > > >
> >>>>>>>  > >
> >>>>>>>  >
> >>>>
> >>>>
> >>>
> >>
> > 
> 
> 
> Attachments:
> * signature.asc

Re: [DISCUSS] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

Posted by "Matthias J. Sax" <mj...@apache.org>.
Hi,

I just had one more thought about an additional improvement we might
want to include in this KIP.

Kafka Streams ships with a `WallclockTimestampExtractor` that just
returns `System.currentTimeMillis()` and thus, cannot be mocked. And it
seems that there is no way for a user to build a custom timestamps
extractor that returns the TTD's mocked system time.

Thus, it might be nice, to add a `MockTimeExtractor` (only in the
test-util package) that users could set and this `MockTimeExtractor`
returns the mocked system time.

Thoughts?


-Matthias

On 7/7/20 11:11 PM, Matthias J. Sax wrote:
> I think, we don't need a default implementation for the new methods.
> 
> What would be the use-case to implement the  `ProcessorContext`
> interface? In contract to, for example, `KeyValueStore`,
> `ProcessorContext` is a use-only interface because it's never passed
> into Kafka Streams, but only handed out to the user.
> 
> 
> -Matthias
> 
> 
> On 7/7/20 1:28 PM, William Bottrell wrote:
>> Sure, I would appreciate help from Piotr creating an example.
>>
>> On Tue, Jul 7, 2020 at 12:03 PM Boyang Chen <re...@gmail.com>
>> wrote:
>>
>>> Hey John,
>>>
>>> since ProcessorContext is a public API, I couldn't be sure that people
>>> won't try to extend it. Without a default implementation, user code
>>> compilation will break.
>>>
>>> William and Piotr, it seems that we haven't added any example usage of the
>>> new API, could we try to address that? It should help with the motivation
>>> and follow-up meta comments as John proposed.
>>>
>>> Boyang
>>>
>>> On Mon, Jul 6, 2020 at 12:04 PM Matthias J. Sax <mj...@apache.org> wrote:
>>>
>>>> William,
>>>>
>>>> thanks for the KIP. LGMT. Feel free to start a vote.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 7/4/20 10:14 AM, John Roesler wrote:
>>>>> Hi Richard,
>>>>>
>>>>> It’s good to hear from you!
>>>>>
>>>>> Thanks for bringing up the wall-clock suppression feature. IIRC,
>>> someone
>>>> actually started a KIP discussion for it already, but I don’t think it
>>> went
>>>> to a vote. I don’t recall any technical impediment, just the lack of
>>>> availability to finish it up. Although there is some association, it
>>> would
>>>> be good to keep the KIPs separate.
>>>>>
>>>>> Thanks,
>>>>> John
>>>>>
>>>>> On Sat, Jul 4, 2020, at 10:05, Richard Yu wrote:
>>>>>> Hi all,
>>>>>>
>>>>>> This reminds me of a previous issue I think that we were discussing.
>>>>>> @John Roesler <ma...@apache.org> I think you should
>>> remember
>>>> this one.
>>>>>>
>>>>>> A while back, we were talking about having suppress operator emit
>>>>>> records by wall-clock time instead of stream time.
>>>>>> If we are adding this, wouldn't that make it more feasible for us to
>>>>>> implement that feature for suppression?
>>>>>>
>>>>>> If I recall correctly, there actually had been quite a bit of user
>>>>>> demand for such a feature.
>>>>>> Might be good to include it in this KIP (or maybe use one of the prior
>>>>>> KIPs for this feature).
>>>>>>
>>>>>> Best,
>>>>>> Richard
>>>>>>
>>>>>> On Sat, Jul 4, 2020 at 6:58 AM John Roesler <vv...@apache.org>
>>>> wrote:
>>>>>>> Hi all,
>>>>>>>
>>>>>>>  1. Thanks, Boyang, it is nice to see usage examples in KIPs like
>>>> this. It helps during the discussion, and it’s also good documentation
>>>> later on.
>>>>>>>
>>>>>>>  2. Yeah, this is a subtle point. The motivation mentions being able
>>>> to control the time during tests, but to be able to make it work, the
>>>> processor implementation needs a public method on ProcessorContext to get
>>>> the time. Otherwise, processors would have to check the type of the
>>> context
>>>> and cast, depending on whether they’re running inside a test or not. In
>>>> retrospect, if we’d had a usage example, this probably would have been
>>>> clear.
>>>>>>>
>>>>>>>  3. I don’t think we expect people to have their own implementations
>>>> of ProcessorContext. Since all implementations are internal, it’s really
>>> an
>>>> implementation detail whether we use a default method, abstract methods,
>>> or
>>>> concrete methods. I can’t think of an implementation that really wants to
>>>> just look up the system time. In the production code path, we cache the
>>>> time for performance, and in testing, we use a mock time.
>>>>>>>
>>>>>>>  Thanks,
>>>>>>>  John
>>>>>>>
>>>>>>>
>>>>>>>  On Fri, Jul 3, 2020, at 06:41, Piotr Smoliński wrote:
>>>>>>>  > 1. Makes sense; let me propose something
>>>>>>>  >
>>>>>>>  > 2. That's not testing-only. The goal is to use the same API to
>>>> access
>>>>>>>  > the time
>>>>>>>  > in deployment and testing environments. The major driver is
>>>>>>>  > System.currentTimeMillis(),
>>>>>>>  > which a) cannot be used in tests b) could go in specific cases
>>> back
>>>> c)
>>>>>>>  > is not compatible
>>>>>>>  > with punctuator call. The idea is that we could access clock using
>>>>>>>  > uniform API.
>>>>>>>  > For completness we should have same API for system and stream
>>> time.
>>>>>>>  >
>>>>>>>  > 3. There aren't that many subclasses. Two important ones are
>>>>>>>  > ProcessorContextImpl and
>>>>>>>  > MockProcessorContext (and third one:
>>>>>>>  > ForwardingDisableProcessorContext). If given
>>>>>>>  > implementation does not support schedule() call, there is no
>>> reason
>>>> to
>>>>>>>  > support clock access.
>>>>>>>  > The default implementation should just throw
>>>>>>>  > UnsupportedOperationException just to prevent
>>>>>>>  > from compilation errors in possible subclasses.
>>>>>>>  >
>>>>>>>  > On 2020/07/01 02:24:43, Boyang Chen <re...@gmail.com>
>>>> wrote:
>>>>>>>  > > Thanks Will for the KIP. A couple questions and suggestions:
>>>>>>>  > >
>>>>>>>  > > 1. I think for new APIs to make most sense, we should add a
>>>> minimal example
>>>>>>>  > > demonstrating how it could be useful to structure unit tests w/o
>>>> the new
>>>>>>>  > > APIs.
>>>>>>>  > > 2. If this is a testing-only feature, could we only add it
>>>>>>>  > > to MockProcessorContext?
>>>>>>>  > > 3. Regarding the API, since this will be added to the
>>>> ProcessorContext with
>>>>>>>  > > many subclasses, does it make sense to provide default
>>>> implementations as
>>>>>>>  > > well?
>>>>>>>  > >
>>>>>>>  > > Boyang
>>>>>>>  > >
>>>>>>>  > > On Tue, Jun 30, 2020 at 6:56 PM William Bottrell <
>>>> bottrellw@gmail.com>
>>>>>>>  > > wrote:
>>>>>>>  > >
>>>>>>>  > > > Thanks, John! I made the change. How much longer should I let
>>>> there be
>>>>>>>  > > > discussion before starting a VOTE?
>>>>>>>  > > >
>>>>>>>  > > > On Sat, Jun 27, 2020 at 6:50 AM John Roesler <
>>>> vvcephei@apache.org> wrote:
>>>>>>>  > > >
>>>>>>>  > > > > Thanks, Will,
>>>>>>>  > > > >
>>>>>>>  > > > > That looks good to me. I would only add "cached" or
>>> something
>>>>>>>  > > > > to indicate that it wouldn't just transparently look up the
>>>> current
>>>>>>>  > > > > System.currentTimeMillis every time.
>>>>>>>  > > > >
>>>>>>>  > > > > For example:
>>>>>>>  > > > > /**
>>>>>>>  > > > > * Returns current cached wall-clock system timestamp in
>>>> milliseconds.
>>>>>>>  > > > > *
>>>>>>>  > > > > * @return the current cached wall-clock system timestamp in
>>>> milliseconds
>>>>>>>  > > > > */
>>>>>>>  > > > > long currentSystemTimeMs();
>>>>>>>  > > > >
>>>>>>>  > > > > I don't want to give specific information about _when_
>>>> exactly the
>>>>>>>  > > > > timestamp cache will be updated, so that we can adjust it in
>>>> the
>>>>>>>  > > > > future, but it does seem important to make people aware that
>>>> they
>>>>>>>  > > > > won't see the timestamp advance during the execution of
>>>>>>>  > > > > Processor.process(), for example.
>>>>>>>  > > > >
>>>>>>>  > > > > With that modification, I'll be +1 on this proposal.
>>>>>>>  > > > >
>>>>>>>  > > > > Thanks again for the KIP!
>>>>>>>  > > > > -John
>>>>>>>  > > > >
>>>>>>>  > > > > On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:
>>>>>>>  > > > > > Thanks, John! I appreciate you adjusting my lingo. I made
>>>> the change to
>>>>>>>  > > > > the
>>>>>>>  > > > > > KIP. I will add the note about system time to the javadoc.
>>>>>>>  > > > > >
>>>>>>>  > > > > > On Wed, Jun 24, 2020 at 6:52 PM John Roesler <
>>>> vvcephei@apache.org>
>>>>>>>  > > > > wrote:
>>>>>>>  > > > > >
>>>>>>>  > > > > > > Hi Will,
>>>>>>>  > > > > > >
>>>>>>>  > > > > > > This proposal looks good to me overall. Thanks for the
>>>> contribution!
>>>>>>>  > > > > > >
>>>>>>>  > > > > > > Just a couple of minor notes:
>>>>>>>  > > > > > >
>>>>>>>  > > > > > > The system time method would return a cached timestamp
>>>> that Streams
>>>>>>>  > > > > looks
>>>>>>>  > > > > > > up once when it starts processing a record. This may be
>>>> confusing, so
>>>>>>>  > > > > it
>>>>>>>  > > > > > > might be good to state it in the javadoc.
>>>>>>>  > > > > > >
>>>>>>>  > > > > > > I thought the javadoc for the stream time might be a bit
>>>> confusing.
>>>>>>>  > > > We
>>>>>>>  > > > > > > normally talk about “Tasks” not “partition groups” in
>>> the
>>>> public api.
>>>>>>>  > > > > Maybe
>>>>>>>  > > > > > > just saying that it’s “the maximum timestamp of any
>>>> record yet
>>>>>>>  > > > > processed by
>>>>>>>  > > > > > > the task” would be both high level and accurate.
>>>>>>>  > > > > > >
>>>>>>>  > > > > > > Thanks again!
>>>>>>>  > > > > > > -John
>>>>>>>  > > > > > >
>>>>>>>  > > > > > > On Mon, Jun 22, 2020, at 02:10, William Bottrell wrote:
>>>>>>>  > > > > > > > Thanks, Bruno. I updated the KIP, so hopefully it
>>> makes
>>>> more sense.
>>>>>>>  > > > > > > Thanks
>>>>>>>  > > > > > > > to Matthias J. Sax and Piotr Smolinski for helping
>>> with
>>>> details.
>>>>>>>  > > > > > > >
>>>>>>>  > > > > > > > I welcome more feedback. Let me know if something
>>>> doesn't make
>>>>>>>  > > > sense
>>>>>>>  > > > > or I
>>>>>>>  > > > > > > > need to provide more detail. Also, feel free to
>>>> enlighten me.
>>>>>>>  > > > Thanks!
>>>>>>>  > > > > > > >
>>>>>>>  > > > > > > > On Thu, Jun 11, 2020 at 1:11 PM Bruno Cadonna <
>>>> bruno@confluent.io>
>>>>>>>  > > > > > > wrote:
>>>>>>>  > > > > > > >
>>>>>>>  > > > > > > > > Hi Will,
>>>>>>>  > > > > > > > >
>>>>>>>  > > > > > > > > Thank you for the KIP.
>>>>>>>  > > > > > > > >
>>>>>>>  > > > > > > > > 1. Could you elaborate a bit more on the motivation
>>>> in the KIP?
>>>>>>>  > > > An
>>>>>>>  > > > > > > > > example would make the motivation clearer.
>>>>>>>  > > > > > > > >
>>>>>>>  > > > > > > > > 2. In section "Proposed Changes" you do not need to
>>>> show the
>>>>>>>  > > > > > > > > implementation and describe internals. A description
>>>> of the
>>>>>>>  > > > > expected
>>>>>>>  > > > > > > > > behavior of the newly added methods should suffice.
>>>>>>>  > > > > > > > >
>>>>>>>  > > > > > > > > 3. In "Compatibility, Deprecation, and Migration
>>>> Plan" you should
>>>>>>>  > > > > > > > > state that the change is backward compatible because
>>>> the two
>>>>>>>  > > > > methods
>>>>>>>  > > > > > > > > will be added and no other method will be changed or
>>>> removed.
>>>>>>>  > > > > > > > >
>>>>>>>  > > > > > > > > Best,
>>>>>>>  > > > > > > > > Bruno
>>>>>>>  > > > > > > > >
>>>>>>>  > > > > > > > > On Wed, Jun 10, 2020 at 10:06 AM William Bottrell <
>>>>>>>  > > > > bottrellw@gmail.com
>>>>>>>  > > > > > > >
>>>>>>>  > > > > > > > > wrote:
>>>>>>>  > > > > > > > > >
>>>>>>>  > > > > > > > > > Add currentSystemTimeMs and currentStreamTimeMs to
>>>>>>>  > > > > ProcessorContext
>>>>>>>  > > > > > > > > > <
>>>>>>>  > > > > > > > >
>>>>>>>  > > > > > >
>>>>>>>  > > > >
>>>>>>>  > > >
>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext
>>>>>>>  > > > > > > > > >
>>>>>>>  > > > > > > > > >
>>>>>>>  > > > > > > > > > I am extremely new to Kafka, but thank you to John
>>>> Roesler and
>>>>>>>  > > > > > > Matthias
>>>>>>>  > > > > > > > > J.
>>>>>>>  > > > > > > > > > Sax for pointing me in the right direction. I
>>>> accept any and
>>>>>>>  > > > all
>>>>>>>  > > > > > > > > feedback.
>>>>>>>  > > > > > > > > >
>>>>>>>  > > > > > > > > > Thanks,
>>>>>>>  > > > > > > > > > Will
>>>>>>>  > > > > > > > >
>>>>>>>  > > > > > > >
>>>>>>>  > > > > > >
>>>>>>>  > > > > >
>>>>>>>  > > > >
>>>>>>>  > > >
>>>>>>>  > >
>>>>>>>  >
>>>>
>>>>
>>>
>>
> 


Re: [DISCUSS] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

Posted by "Matthias J. Sax" <mj...@apache.org>.
I think, we don't need a default implementation for the new methods.

What would be the use-case to implement the  `ProcessorContext`
interface? In contract to, for example, `KeyValueStore`,
`ProcessorContext` is a use-only interface because it's never passed
into Kafka Streams, but only handed out to the user.


-Matthias


On 7/7/20 1:28 PM, William Bottrell wrote:
> Sure, I would appreciate help from Piotr creating an example.
> 
> On Tue, Jul 7, 2020 at 12:03 PM Boyang Chen <re...@gmail.com>
> wrote:
> 
>> Hey John,
>>
>> since ProcessorContext is a public API, I couldn't be sure that people
>> won't try to extend it. Without a default implementation, user code
>> compilation will break.
>>
>> William and Piotr, it seems that we haven't added any example usage of the
>> new API, could we try to address that? It should help with the motivation
>> and follow-up meta comments as John proposed.
>>
>> Boyang
>>
>> On Mon, Jul 6, 2020 at 12:04 PM Matthias J. Sax <mj...@apache.org> wrote:
>>
>>> William,
>>>
>>> thanks for the KIP. LGMT. Feel free to start a vote.
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 7/4/20 10:14 AM, John Roesler wrote:
>>>> Hi Richard,
>>>>
>>>> It’s good to hear from you!
>>>>
>>>> Thanks for bringing up the wall-clock suppression feature. IIRC,
>> someone
>>> actually started a KIP discussion for it already, but I don’t think it
>> went
>>> to a vote. I don’t recall any technical impediment, just the lack of
>>> availability to finish it up. Although there is some association, it
>> would
>>> be good to keep the KIPs separate.
>>>>
>>>> Thanks,
>>>> John
>>>>
>>>> On Sat, Jul 4, 2020, at 10:05, Richard Yu wrote:
>>>>> Hi all,
>>>>>
>>>>> This reminds me of a previous issue I think that we were discussing.
>>>>> @John Roesler <ma...@apache.org> I think you should
>> remember
>>> this one.
>>>>>
>>>>> A while back, we were talking about having suppress operator emit
>>>>> records by wall-clock time instead of stream time.
>>>>> If we are adding this, wouldn't that make it more feasible for us to
>>>>> implement that feature for suppression?
>>>>>
>>>>> If I recall correctly, there actually had been quite a bit of user
>>>>> demand for such a feature.
>>>>> Might be good to include it in this KIP (or maybe use one of the prior
>>>>> KIPs for this feature).
>>>>>
>>>>> Best,
>>>>> Richard
>>>>>
>>>>> On Sat, Jul 4, 2020 at 6:58 AM John Roesler <vv...@apache.org>
>>> wrote:
>>>>>> Hi all,
>>>>>>
>>>>>>  1. Thanks, Boyang, it is nice to see usage examples in KIPs like
>>> this. It helps during the discussion, and it’s also good documentation
>>> later on.
>>>>>>
>>>>>>  2. Yeah, this is a subtle point. The motivation mentions being able
>>> to control the time during tests, but to be able to make it work, the
>>> processor implementation needs a public method on ProcessorContext to get
>>> the time. Otherwise, processors would have to check the type of the
>> context
>>> and cast, depending on whether they’re running inside a test or not. In
>>> retrospect, if we’d had a usage example, this probably would have been
>>> clear.
>>>>>>
>>>>>>  3. I don’t think we expect people to have their own implementations
>>> of ProcessorContext. Since all implementations are internal, it’s really
>> an
>>> implementation detail whether we use a default method, abstract methods,
>> or
>>> concrete methods. I can’t think of an implementation that really wants to
>>> just look up the system time. In the production code path, we cache the
>>> time for performance, and in testing, we use a mock time.
>>>>>>
>>>>>>  Thanks,
>>>>>>  John
>>>>>>
>>>>>>
>>>>>>  On Fri, Jul 3, 2020, at 06:41, Piotr Smoliński wrote:
>>>>>>  > 1. Makes sense; let me propose something
>>>>>>  >
>>>>>>  > 2. That's not testing-only. The goal is to use the same API to
>>> access
>>>>>>  > the time
>>>>>>  > in deployment and testing environments. The major driver is
>>>>>>  > System.currentTimeMillis(),
>>>>>>  > which a) cannot be used in tests b) could go in specific cases
>> back
>>> c)
>>>>>>  > is not compatible
>>>>>>  > with punctuator call. The idea is that we could access clock using
>>>>>>  > uniform API.
>>>>>>  > For completness we should have same API for system and stream
>> time.
>>>>>>  >
>>>>>>  > 3. There aren't that many subclasses. Two important ones are
>>>>>>  > ProcessorContextImpl and
>>>>>>  > MockProcessorContext (and third one:
>>>>>>  > ForwardingDisableProcessorContext). If given
>>>>>>  > implementation does not support schedule() call, there is no
>> reason
>>> to
>>>>>>  > support clock access.
>>>>>>  > The default implementation should just throw
>>>>>>  > UnsupportedOperationException just to prevent
>>>>>>  > from compilation errors in possible subclasses.
>>>>>>  >
>>>>>>  > On 2020/07/01 02:24:43, Boyang Chen <re...@gmail.com>
>>> wrote:
>>>>>>  > > Thanks Will for the KIP. A couple questions and suggestions:
>>>>>>  > >
>>>>>>  > > 1. I think for new APIs to make most sense, we should add a
>>> minimal example
>>>>>>  > > demonstrating how it could be useful to structure unit tests w/o
>>> the new
>>>>>>  > > APIs.
>>>>>>  > > 2. If this is a testing-only feature, could we only add it
>>>>>>  > > to MockProcessorContext?
>>>>>>  > > 3. Regarding the API, since this will be added to the
>>> ProcessorContext with
>>>>>>  > > many subclasses, does it make sense to provide default
>>> implementations as
>>>>>>  > > well?
>>>>>>  > >
>>>>>>  > > Boyang
>>>>>>  > >
>>>>>>  > > On Tue, Jun 30, 2020 at 6:56 PM William Bottrell <
>>> bottrellw@gmail.com>
>>>>>>  > > wrote:
>>>>>>  > >
>>>>>>  > > > Thanks, John! I made the change. How much longer should I let
>>> there be
>>>>>>  > > > discussion before starting a VOTE?
>>>>>>  > > >
>>>>>>  > > > On Sat, Jun 27, 2020 at 6:50 AM John Roesler <
>>> vvcephei@apache.org> wrote:
>>>>>>  > > >
>>>>>>  > > > > Thanks, Will,
>>>>>>  > > > >
>>>>>>  > > > > That looks good to me. I would only add "cached" or
>> something
>>>>>>  > > > > to indicate that it wouldn't just transparently look up the
>>> current
>>>>>>  > > > > System.currentTimeMillis every time.
>>>>>>  > > > >
>>>>>>  > > > > For example:
>>>>>>  > > > > /**
>>>>>>  > > > > * Returns current cached wall-clock system timestamp in
>>> milliseconds.
>>>>>>  > > > > *
>>>>>>  > > > > * @return the current cached wall-clock system timestamp in
>>> milliseconds
>>>>>>  > > > > */
>>>>>>  > > > > long currentSystemTimeMs();
>>>>>>  > > > >
>>>>>>  > > > > I don't want to give specific information about _when_
>>> exactly the
>>>>>>  > > > > timestamp cache will be updated, so that we can adjust it in
>>> the
>>>>>>  > > > > future, but it does seem important to make people aware that
>>> they
>>>>>>  > > > > won't see the timestamp advance during the execution of
>>>>>>  > > > > Processor.process(), for example.
>>>>>>  > > > >
>>>>>>  > > > > With that modification, I'll be +1 on this proposal.
>>>>>>  > > > >
>>>>>>  > > > > Thanks again for the KIP!
>>>>>>  > > > > -John
>>>>>>  > > > >
>>>>>>  > > > > On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:
>>>>>>  > > > > > Thanks, John! I appreciate you adjusting my lingo. I made
>>> the change to
>>>>>>  > > > > the
>>>>>>  > > > > > KIP. I will add the note about system time to the javadoc.
>>>>>>  > > > > >
>>>>>>  > > > > > On Wed, Jun 24, 2020 at 6:52 PM John Roesler <
>>> vvcephei@apache.org>
>>>>>>  > > > > wrote:
>>>>>>  > > > > >
>>>>>>  > > > > > > Hi Will,
>>>>>>  > > > > > >
>>>>>>  > > > > > > This proposal looks good to me overall. Thanks for the
>>> contribution!
>>>>>>  > > > > > >
>>>>>>  > > > > > > Just a couple of minor notes:
>>>>>>  > > > > > >
>>>>>>  > > > > > > The system time method would return a cached timestamp
>>> that Streams
>>>>>>  > > > > looks
>>>>>>  > > > > > > up once when it starts processing a record. This may be
>>> confusing, so
>>>>>>  > > > > it
>>>>>>  > > > > > > might be good to state it in the javadoc.
>>>>>>  > > > > > >
>>>>>>  > > > > > > I thought the javadoc for the stream time might be a bit
>>> confusing.
>>>>>>  > > > We
>>>>>>  > > > > > > normally talk about “Tasks” not “partition groups” in
>> the
>>> public api.
>>>>>>  > > > > Maybe
>>>>>>  > > > > > > just saying that it’s “the maximum timestamp of any
>>> record yet
>>>>>>  > > > > processed by
>>>>>>  > > > > > > the task” would be both high level and accurate.
>>>>>>  > > > > > >
>>>>>>  > > > > > > Thanks again!
>>>>>>  > > > > > > -John
>>>>>>  > > > > > >
>>>>>>  > > > > > > On Mon, Jun 22, 2020, at 02:10, William Bottrell wrote:
>>>>>>  > > > > > > > Thanks, Bruno. I updated the KIP, so hopefully it
>> makes
>>> more sense.
>>>>>>  > > > > > > Thanks
>>>>>>  > > > > > > > to Matthias J. Sax and Piotr Smolinski for helping
>> with
>>> details.
>>>>>>  > > > > > > >
>>>>>>  > > > > > > > I welcome more feedback. Let me know if something
>>> doesn't make
>>>>>>  > > > sense
>>>>>>  > > > > or I
>>>>>>  > > > > > > > need to provide more detail. Also, feel free to
>>> enlighten me.
>>>>>>  > > > Thanks!
>>>>>>  > > > > > > >
>>>>>>  > > > > > > > On Thu, Jun 11, 2020 at 1:11 PM Bruno Cadonna <
>>> bruno@confluent.io>
>>>>>>  > > > > > > wrote:
>>>>>>  > > > > > > >
>>>>>>  > > > > > > > > Hi Will,
>>>>>>  > > > > > > > >
>>>>>>  > > > > > > > > Thank you for the KIP.
>>>>>>  > > > > > > > >
>>>>>>  > > > > > > > > 1. Could you elaborate a bit more on the motivation
>>> in the KIP?
>>>>>>  > > > An
>>>>>>  > > > > > > > > example would make the motivation clearer.
>>>>>>  > > > > > > > >
>>>>>>  > > > > > > > > 2. In section "Proposed Changes" you do not need to
>>> show the
>>>>>>  > > > > > > > > implementation and describe internals. A description
>>> of the
>>>>>>  > > > > expected
>>>>>>  > > > > > > > > behavior of the newly added methods should suffice.
>>>>>>  > > > > > > > >
>>>>>>  > > > > > > > > 3. In "Compatibility, Deprecation, and Migration
>>> Plan" you should
>>>>>>  > > > > > > > > state that the change is backward compatible because
>>> the two
>>>>>>  > > > > methods
>>>>>>  > > > > > > > > will be added and no other method will be changed or
>>> removed.
>>>>>>  > > > > > > > >
>>>>>>  > > > > > > > > Best,
>>>>>>  > > > > > > > > Bruno
>>>>>>  > > > > > > > >
>>>>>>  > > > > > > > > On Wed, Jun 10, 2020 at 10:06 AM William Bottrell <
>>>>>>  > > > > bottrellw@gmail.com
>>>>>>  > > > > > > >
>>>>>>  > > > > > > > > wrote:
>>>>>>  > > > > > > > > >
>>>>>>  > > > > > > > > > Add currentSystemTimeMs and currentStreamTimeMs to
>>>>>>  > > > > ProcessorContext
>>>>>>  > > > > > > > > > <
>>>>>>  > > > > > > > >
>>>>>>  > > > > > >
>>>>>>  > > > >
>>>>>>  > > >
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext
>>>>>>  > > > > > > > > >
>>>>>>  > > > > > > > > >
>>>>>>  > > > > > > > > > I am extremely new to Kafka, but thank you to John
>>> Roesler and
>>>>>>  > > > > > > Matthias
>>>>>>  > > > > > > > > J.
>>>>>>  > > > > > > > > > Sax for pointing me in the right direction. I
>>> accept any and
>>>>>>  > > > all
>>>>>>  > > > > > > > > feedback.
>>>>>>  > > > > > > > > >
>>>>>>  > > > > > > > > > Thanks,
>>>>>>  > > > > > > > > > Will
>>>>>>  > > > > > > > >
>>>>>>  > > > > > > >
>>>>>>  > > > > > >
>>>>>>  > > > > >
>>>>>>  > > > >
>>>>>>  > > >
>>>>>>  > >
>>>>>>  >
>>>
>>>
>>
> 


Re: [DISCUSS] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

Posted by William Bottrell <bo...@gmail.com>.
Sure, I would appreciate help from Piotr creating an example.

On Tue, Jul 7, 2020 at 12:03 PM Boyang Chen <re...@gmail.com>
wrote:

> Hey John,
>
> since ProcessorContext is a public API, I couldn't be sure that people
> won't try to extend it. Without a default implementation, user code
> compilation will break.
>
> William and Piotr, it seems that we haven't added any example usage of the
> new API, could we try to address that? It should help with the motivation
> and follow-up meta comments as John proposed.
>
> Boyang
>
> On Mon, Jul 6, 2020 at 12:04 PM Matthias J. Sax <mj...@apache.org> wrote:
>
> > William,
> >
> > thanks for the KIP. LGMT. Feel free to start a vote.
> >
> >
> > -Matthias
> >
> >
> > On 7/4/20 10:14 AM, John Roesler wrote:
> > > Hi Richard,
> > >
> > > It’s good to hear from you!
> > >
> > > Thanks for bringing up the wall-clock suppression feature. IIRC,
> someone
> > actually started a KIP discussion for it already, but I don’t think it
> went
> > to a vote. I don’t recall any technical impediment, just the lack of
> > availability to finish it up. Although there is some association, it
> would
> > be good to keep the KIPs separate.
> > >
> > > Thanks,
> > > John
> > >
> > > On Sat, Jul 4, 2020, at 10:05, Richard Yu wrote:
> > >> Hi all,
> > >>
> > >> This reminds me of a previous issue I think that we were discussing.
> > >> @John Roesler <ma...@apache.org> I think you should
> remember
> > this one.
> > >>
> > >> A while back, we were talking about having suppress operator emit
> > >> records by wall-clock time instead of stream time.
> > >> If we are adding this, wouldn't that make it more feasible for us to
> > >> implement that feature for suppression?
> > >>
> > >> If I recall correctly, there actually had been quite a bit of user
> > >> demand for such a feature.
> > >> Might be good to include it in this KIP (or maybe use one of the prior
> > >> KIPs for this feature).
> > >>
> > >> Best,
> > >> Richard
> > >>
> > >> On Sat, Jul 4, 2020 at 6:58 AM John Roesler <vv...@apache.org>
> > wrote:
> > >>> Hi all,
> > >>>
> > >>>  1. Thanks, Boyang, it is nice to see usage examples in KIPs like
> > this. It helps during the discussion, and it’s also good documentation
> > later on.
> > >>>
> > >>>  2. Yeah, this is a subtle point. The motivation mentions being able
> > to control the time during tests, but to be able to make it work, the
> > processor implementation needs a public method on ProcessorContext to get
> > the time. Otherwise, processors would have to check the type of the
> context
> > and cast, depending on whether they’re running inside a test or not. In
> > retrospect, if we’d had a usage example, this probably would have been
> > clear.
> > >>>
> > >>>  3. I don’t think we expect people to have their own implementations
> > of ProcessorContext. Since all implementations are internal, it’s really
> an
> > implementation detail whether we use a default method, abstract methods,
> or
> > concrete methods. I can’t think of an implementation that really wants to
> > just look up the system time. In the production code path, we cache the
> > time for performance, and in testing, we use a mock time.
> > >>>
> > >>>  Thanks,
> > >>>  John
> > >>>
> > >>>
> > >>>  On Fri, Jul 3, 2020, at 06:41, Piotr Smoliński wrote:
> > >>>  > 1. Makes sense; let me propose something
> > >>>  >
> > >>>  > 2. That's not testing-only. The goal is to use the same API to
> > access
> > >>>  > the time
> > >>>  > in deployment and testing environments. The major driver is
> > >>>  > System.currentTimeMillis(),
> > >>>  > which a) cannot be used in tests b) could go in specific cases
> back
> > c)
> > >>>  > is not compatible
> > >>>  > with punctuator call. The idea is that we could access clock using
> > >>>  > uniform API.
> > >>>  > For completness we should have same API for system and stream
> time.
> > >>>  >
> > >>>  > 3. There aren't that many subclasses. Two important ones are
> > >>>  > ProcessorContextImpl and
> > >>>  > MockProcessorContext (and third one:
> > >>>  > ForwardingDisableProcessorContext). If given
> > >>>  > implementation does not support schedule() call, there is no
> reason
> > to
> > >>>  > support clock access.
> > >>>  > The default implementation should just throw
> > >>>  > UnsupportedOperationException just to prevent
> > >>>  > from compilation errors in possible subclasses.
> > >>>  >
> > >>>  > On 2020/07/01 02:24:43, Boyang Chen <re...@gmail.com>
> > wrote:
> > >>>  > > Thanks Will for the KIP. A couple questions and suggestions:
> > >>>  > >
> > >>>  > > 1. I think for new APIs to make most sense, we should add a
> > minimal example
> > >>>  > > demonstrating how it could be useful to structure unit tests w/o
> > the new
> > >>>  > > APIs.
> > >>>  > > 2. If this is a testing-only feature, could we only add it
> > >>>  > > to MockProcessorContext?
> > >>>  > > 3. Regarding the API, since this will be added to the
> > ProcessorContext with
> > >>>  > > many subclasses, does it make sense to provide default
> > implementations as
> > >>>  > > well?
> > >>>  > >
> > >>>  > > Boyang
> > >>>  > >
> > >>>  > > On Tue, Jun 30, 2020 at 6:56 PM William Bottrell <
> > bottrellw@gmail.com>
> > >>>  > > wrote:
> > >>>  > >
> > >>>  > > > Thanks, John! I made the change. How much longer should I let
> > there be
> > >>>  > > > discussion before starting a VOTE?
> > >>>  > > >
> > >>>  > > > On Sat, Jun 27, 2020 at 6:50 AM John Roesler <
> > vvcephei@apache.org> wrote:
> > >>>  > > >
> > >>>  > > > > Thanks, Will,
> > >>>  > > > >
> > >>>  > > > > That looks good to me. I would only add "cached" or
> something
> > >>>  > > > > to indicate that it wouldn't just transparently look up the
> > current
> > >>>  > > > > System.currentTimeMillis every time.
> > >>>  > > > >
> > >>>  > > > > For example:
> > >>>  > > > > /**
> > >>>  > > > > * Returns current cached wall-clock system timestamp in
> > milliseconds.
> > >>>  > > > > *
> > >>>  > > > > * @return the current cached wall-clock system timestamp in
> > milliseconds
> > >>>  > > > > */
> > >>>  > > > > long currentSystemTimeMs();
> > >>>  > > > >
> > >>>  > > > > I don't want to give specific information about _when_
> > exactly the
> > >>>  > > > > timestamp cache will be updated, so that we can adjust it in
> > the
> > >>>  > > > > future, but it does seem important to make people aware that
> > they
> > >>>  > > > > won't see the timestamp advance during the execution of
> > >>>  > > > > Processor.process(), for example.
> > >>>  > > > >
> > >>>  > > > > With that modification, I'll be +1 on this proposal.
> > >>>  > > > >
> > >>>  > > > > Thanks again for the KIP!
> > >>>  > > > > -John
> > >>>  > > > >
> > >>>  > > > > On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:
> > >>>  > > > > > Thanks, John! I appreciate you adjusting my lingo. I made
> > the change to
> > >>>  > > > > the
> > >>>  > > > > > KIP. I will add the note about system time to the javadoc.
> > >>>  > > > > >
> > >>>  > > > > > On Wed, Jun 24, 2020 at 6:52 PM John Roesler <
> > vvcephei@apache.org>
> > >>>  > > > > wrote:
> > >>>  > > > > >
> > >>>  > > > > > > Hi Will,
> > >>>  > > > > > >
> > >>>  > > > > > > This proposal looks good to me overall. Thanks for the
> > contribution!
> > >>>  > > > > > >
> > >>>  > > > > > > Just a couple of minor notes:
> > >>>  > > > > > >
> > >>>  > > > > > > The system time method would return a cached timestamp
> > that Streams
> > >>>  > > > > looks
> > >>>  > > > > > > up once when it starts processing a record. This may be
> > confusing, so
> > >>>  > > > > it
> > >>>  > > > > > > might be good to state it in the javadoc.
> > >>>  > > > > > >
> > >>>  > > > > > > I thought the javadoc for the stream time might be a bit
> > confusing.
> > >>>  > > > We
> > >>>  > > > > > > normally talk about “Tasks” not “partition groups” in
> the
> > public api.
> > >>>  > > > > Maybe
> > >>>  > > > > > > just saying that it’s “the maximum timestamp of any
> > record yet
> > >>>  > > > > processed by
> > >>>  > > > > > > the task” would be both high level and accurate.
> > >>>  > > > > > >
> > >>>  > > > > > > Thanks again!
> > >>>  > > > > > > -John
> > >>>  > > > > > >
> > >>>  > > > > > > On Mon, Jun 22, 2020, at 02:10, William Bottrell wrote:
> > >>>  > > > > > > > Thanks, Bruno. I updated the KIP, so hopefully it
> makes
> > more sense.
> > >>>  > > > > > > Thanks
> > >>>  > > > > > > > to Matthias J. Sax and Piotr Smolinski for helping
> with
> > details.
> > >>>  > > > > > > >
> > >>>  > > > > > > > I welcome more feedback. Let me know if something
> > doesn't make
> > >>>  > > > sense
> > >>>  > > > > or I
> > >>>  > > > > > > > need to provide more detail. Also, feel free to
> > enlighten me.
> > >>>  > > > Thanks!
> > >>>  > > > > > > >
> > >>>  > > > > > > > On Thu, Jun 11, 2020 at 1:11 PM Bruno Cadonna <
> > bruno@confluent.io>
> > >>>  > > > > > > wrote:
> > >>>  > > > > > > >
> > >>>  > > > > > > > > Hi Will,
> > >>>  > > > > > > > >
> > >>>  > > > > > > > > Thank you for the KIP.
> > >>>  > > > > > > > >
> > >>>  > > > > > > > > 1. Could you elaborate a bit more on the motivation
> > in the KIP?
> > >>>  > > > An
> > >>>  > > > > > > > > example would make the motivation clearer.
> > >>>  > > > > > > > >
> > >>>  > > > > > > > > 2. In section "Proposed Changes" you do not need to
> > show the
> > >>>  > > > > > > > > implementation and describe internals. A description
> > of the
> > >>>  > > > > expected
> > >>>  > > > > > > > > behavior of the newly added methods should suffice.
> > >>>  > > > > > > > >
> > >>>  > > > > > > > > 3. In "Compatibility, Deprecation, and Migration
> > Plan" you should
> > >>>  > > > > > > > > state that the change is backward compatible because
> > the two
> > >>>  > > > > methods
> > >>>  > > > > > > > > will be added and no other method will be changed or
> > removed.
> > >>>  > > > > > > > >
> > >>>  > > > > > > > > Best,
> > >>>  > > > > > > > > Bruno
> > >>>  > > > > > > > >
> > >>>  > > > > > > > > On Wed, Jun 10, 2020 at 10:06 AM William Bottrell <
> > >>>  > > > > bottrellw@gmail.com
> > >>>  > > > > > > >
> > >>>  > > > > > > > > wrote:
> > >>>  > > > > > > > > >
> > >>>  > > > > > > > > > Add currentSystemTimeMs and currentStreamTimeMs to
> > >>>  > > > > ProcessorContext
> > >>>  > > > > > > > > > <
> > >>>  > > > > > > > >
> > >>>  > > > > > >
> > >>>  > > > >
> > >>>  > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext
> > >>>  > > > > > > > > >
> > >>>  > > > > > > > > >
> > >>>  > > > > > > > > > I am extremely new to Kafka, but thank you to John
> > Roesler and
> > >>>  > > > > > > Matthias
> > >>>  > > > > > > > > J.
> > >>>  > > > > > > > > > Sax for pointing me in the right direction. I
> > accept any and
> > >>>  > > > all
> > >>>  > > > > > > > > feedback.
> > >>>  > > > > > > > > >
> > >>>  > > > > > > > > > Thanks,
> > >>>  > > > > > > > > > Will
> > >>>  > > > > > > > >
> > >>>  > > > > > > >
> > >>>  > > > > > >
> > >>>  > > > > >
> > >>>  > > > >
> > >>>  > > >
> > >>>  > >
> > >>>  >
> >
> >
>

Re: [DISCUSS] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

Posted by Boyang Chen <re...@gmail.com>.
Hey John,

since ProcessorContext is a public API, I couldn't be sure that people
won't try to extend it. Without a default implementation, user code
compilation will break.

William and Piotr, it seems that we haven't added any example usage of the
new API, could we try to address that? It should help with the motivation
and follow-up meta comments as John proposed.

Boyang

On Mon, Jul 6, 2020 at 12:04 PM Matthias J. Sax <mj...@apache.org> wrote:

> William,
>
> thanks for the KIP. LGMT. Feel free to start a vote.
>
>
> -Matthias
>
>
> On 7/4/20 10:14 AM, John Roesler wrote:
> > Hi Richard,
> >
> > It’s good to hear from you!
> >
> > Thanks for bringing up the wall-clock suppression feature. IIRC, someone
> actually started a KIP discussion for it already, but I don’t think it went
> to a vote. I don’t recall any technical impediment, just the lack of
> availability to finish it up. Although there is some association, it would
> be good to keep the KIPs separate.
> >
> > Thanks,
> > John
> >
> > On Sat, Jul 4, 2020, at 10:05, Richard Yu wrote:
> >> Hi all,
> >>
> >> This reminds me of a previous issue I think that we were discussing.
> >> @John Roesler <ma...@apache.org> I think you should remember
> this one.
> >>
> >> A while back, we were talking about having suppress operator emit
> >> records by wall-clock time instead of stream time.
> >> If we are adding this, wouldn't that make it more feasible for us to
> >> implement that feature for suppression?
> >>
> >> If I recall correctly, there actually had been quite a bit of user
> >> demand for such a feature.
> >> Might be good to include it in this KIP (or maybe use one of the prior
> >> KIPs for this feature).
> >>
> >> Best,
> >> Richard
> >>
> >> On Sat, Jul 4, 2020 at 6:58 AM John Roesler <vv...@apache.org>
> wrote:
> >>> Hi all,
> >>>
> >>>  1. Thanks, Boyang, it is nice to see usage examples in KIPs like
> this. It helps during the discussion, and it’s also good documentation
> later on.
> >>>
> >>>  2. Yeah, this is a subtle point. The motivation mentions being able
> to control the time during tests, but to be able to make it work, the
> processor implementation needs a public method on ProcessorContext to get
> the time. Otherwise, processors would have to check the type of the context
> and cast, depending on whether they’re running inside a test or not. In
> retrospect, if we’d had a usage example, this probably would have been
> clear.
> >>>
> >>>  3. I don’t think we expect people to have their own implementations
> of ProcessorContext. Since all implementations are internal, it’s really an
> implementation detail whether we use a default method, abstract methods, or
> concrete methods. I can’t think of an implementation that really wants to
> just look up the system time. In the production code path, we cache the
> time for performance, and in testing, we use a mock time.
> >>>
> >>>  Thanks,
> >>>  John
> >>>
> >>>
> >>>  On Fri, Jul 3, 2020, at 06:41, Piotr Smoliński wrote:
> >>>  > 1. Makes sense; let me propose something
> >>>  >
> >>>  > 2. That's not testing-only. The goal is to use the same API to
> access
> >>>  > the time
> >>>  > in deployment and testing environments. The major driver is
> >>>  > System.currentTimeMillis(),
> >>>  > which a) cannot be used in tests b) could go in specific cases back
> c)
> >>>  > is not compatible
> >>>  > with punctuator call. The idea is that we could access clock using
> >>>  > uniform API.
> >>>  > For completness we should have same API for system and stream time.
> >>>  >
> >>>  > 3. There aren't that many subclasses. Two important ones are
> >>>  > ProcessorContextImpl and
> >>>  > MockProcessorContext (and third one:
> >>>  > ForwardingDisableProcessorContext). If given
> >>>  > implementation does not support schedule() call, there is no reason
> to
> >>>  > support clock access.
> >>>  > The default implementation should just throw
> >>>  > UnsupportedOperationException just to prevent
> >>>  > from compilation errors in possible subclasses.
> >>>  >
> >>>  > On 2020/07/01 02:24:43, Boyang Chen <re...@gmail.com>
> wrote:
> >>>  > > Thanks Will for the KIP. A couple questions and suggestions:
> >>>  > >
> >>>  > > 1. I think for new APIs to make most sense, we should add a
> minimal example
> >>>  > > demonstrating how it could be useful to structure unit tests w/o
> the new
> >>>  > > APIs.
> >>>  > > 2. If this is a testing-only feature, could we only add it
> >>>  > > to MockProcessorContext?
> >>>  > > 3. Regarding the API, since this will be added to the
> ProcessorContext with
> >>>  > > many subclasses, does it make sense to provide default
> implementations as
> >>>  > > well?
> >>>  > >
> >>>  > > Boyang
> >>>  > >
> >>>  > > On Tue, Jun 30, 2020 at 6:56 PM William Bottrell <
> bottrellw@gmail.com>
> >>>  > > wrote:
> >>>  > >
> >>>  > > > Thanks, John! I made the change. How much longer should I let
> there be
> >>>  > > > discussion before starting a VOTE?
> >>>  > > >
> >>>  > > > On Sat, Jun 27, 2020 at 6:50 AM John Roesler <
> vvcephei@apache.org> wrote:
> >>>  > > >
> >>>  > > > > Thanks, Will,
> >>>  > > > >
> >>>  > > > > That looks good to me. I would only add "cached" or something
> >>>  > > > > to indicate that it wouldn't just transparently look up the
> current
> >>>  > > > > System.currentTimeMillis every time.
> >>>  > > > >
> >>>  > > > > For example:
> >>>  > > > > /**
> >>>  > > > > * Returns current cached wall-clock system timestamp in
> milliseconds.
> >>>  > > > > *
> >>>  > > > > * @return the current cached wall-clock system timestamp in
> milliseconds
> >>>  > > > > */
> >>>  > > > > long currentSystemTimeMs();
> >>>  > > > >
> >>>  > > > > I don't want to give specific information about _when_
> exactly the
> >>>  > > > > timestamp cache will be updated, so that we can adjust it in
> the
> >>>  > > > > future, but it does seem important to make people aware that
> they
> >>>  > > > > won't see the timestamp advance during the execution of
> >>>  > > > > Processor.process(), for example.
> >>>  > > > >
> >>>  > > > > With that modification, I'll be +1 on this proposal.
> >>>  > > > >
> >>>  > > > > Thanks again for the KIP!
> >>>  > > > > -John
> >>>  > > > >
> >>>  > > > > On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:
> >>>  > > > > > Thanks, John! I appreciate you adjusting my lingo. I made
> the change to
> >>>  > > > > the
> >>>  > > > > > KIP. I will add the note about system time to the javadoc.
> >>>  > > > > >
> >>>  > > > > > On Wed, Jun 24, 2020 at 6:52 PM John Roesler <
> vvcephei@apache.org>
> >>>  > > > > wrote:
> >>>  > > > > >
> >>>  > > > > > > Hi Will,
> >>>  > > > > > >
> >>>  > > > > > > This proposal looks good to me overall. Thanks for the
> contribution!
> >>>  > > > > > >
> >>>  > > > > > > Just a couple of minor notes:
> >>>  > > > > > >
> >>>  > > > > > > The system time method would return a cached timestamp
> that Streams
> >>>  > > > > looks
> >>>  > > > > > > up once when it starts processing a record. This may be
> confusing, so
> >>>  > > > > it
> >>>  > > > > > > might be good to state it in the javadoc.
> >>>  > > > > > >
> >>>  > > > > > > I thought the javadoc for the stream time might be a bit
> confusing.
> >>>  > > > We
> >>>  > > > > > > normally talk about “Tasks” not “partition groups” in the
> public api.
> >>>  > > > > Maybe
> >>>  > > > > > > just saying that it’s “the maximum timestamp of any
> record yet
> >>>  > > > > processed by
> >>>  > > > > > > the task” would be both high level and accurate.
> >>>  > > > > > >
> >>>  > > > > > > Thanks again!
> >>>  > > > > > > -John
> >>>  > > > > > >
> >>>  > > > > > > On Mon, Jun 22, 2020, at 02:10, William Bottrell wrote:
> >>>  > > > > > > > Thanks, Bruno. I updated the KIP, so hopefully it makes
> more sense.
> >>>  > > > > > > Thanks
> >>>  > > > > > > > to Matthias J. Sax and Piotr Smolinski for helping with
> details.
> >>>  > > > > > > >
> >>>  > > > > > > > I welcome more feedback. Let me know if something
> doesn't make
> >>>  > > > sense
> >>>  > > > > or I
> >>>  > > > > > > > need to provide more detail. Also, feel free to
> enlighten me.
> >>>  > > > Thanks!
> >>>  > > > > > > >
> >>>  > > > > > > > On Thu, Jun 11, 2020 at 1:11 PM Bruno Cadonna <
> bruno@confluent.io>
> >>>  > > > > > > wrote:
> >>>  > > > > > > >
> >>>  > > > > > > > > Hi Will,
> >>>  > > > > > > > >
> >>>  > > > > > > > > Thank you for the KIP.
> >>>  > > > > > > > >
> >>>  > > > > > > > > 1. Could you elaborate a bit more on the motivation
> in the KIP?
> >>>  > > > An
> >>>  > > > > > > > > example would make the motivation clearer.
> >>>  > > > > > > > >
> >>>  > > > > > > > > 2. In section "Proposed Changes" you do not need to
> show the
> >>>  > > > > > > > > implementation and describe internals. A description
> of the
> >>>  > > > > expected
> >>>  > > > > > > > > behavior of the newly added methods should suffice.
> >>>  > > > > > > > >
> >>>  > > > > > > > > 3. In "Compatibility, Deprecation, and Migration
> Plan" you should
> >>>  > > > > > > > > state that the change is backward compatible because
> the two
> >>>  > > > > methods
> >>>  > > > > > > > > will be added and no other method will be changed or
> removed.
> >>>  > > > > > > > >
> >>>  > > > > > > > > Best,
> >>>  > > > > > > > > Bruno
> >>>  > > > > > > > >
> >>>  > > > > > > > > On Wed, Jun 10, 2020 at 10:06 AM William Bottrell <
> >>>  > > > > bottrellw@gmail.com
> >>>  > > > > > > >
> >>>  > > > > > > > > wrote:
> >>>  > > > > > > > > >
> >>>  > > > > > > > > > Add currentSystemTimeMs and currentStreamTimeMs to
> >>>  > > > > ProcessorContext
> >>>  > > > > > > > > > <
> >>>  > > > > > > > >
> >>>  > > > > > >
> >>>  > > > >
> >>>  > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext
> >>>  > > > > > > > > >
> >>>  > > > > > > > > >
> >>>  > > > > > > > > > I am extremely new to Kafka, but thank you to John
> Roesler and
> >>>  > > > > > > Matthias
> >>>  > > > > > > > > J.
> >>>  > > > > > > > > > Sax for pointing me in the right direction. I
> accept any and
> >>>  > > > all
> >>>  > > > > > > > > feedback.
> >>>  > > > > > > > > >
> >>>  > > > > > > > > > Thanks,
> >>>  > > > > > > > > > Will
> >>>  > > > > > > > >
> >>>  > > > > > > >
> >>>  > > > > > >
> >>>  > > > > >
> >>>  > > > >
> >>>  > > >
> >>>  > >
> >>>  >
>
>

Re: [DISCUSS] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

Posted by "Matthias J. Sax" <mj...@apache.org>.
William,

thanks for the KIP. LGMT. Feel free to start a vote.


-Matthias


On 7/4/20 10:14 AM, John Roesler wrote:
> Hi Richard,
> 
> It’s good to hear from you!
> 
> Thanks for bringing up the wall-clock suppression feature. IIRC, someone actually started a KIP discussion for it already, but I don’t think it went to a vote. I don’t recall any technical impediment, just the lack of availability to finish it up. Although there is some association, it would be good to keep the KIPs separate.
> 
> Thanks,
> John
> 
> On Sat, Jul 4, 2020, at 10:05, Richard Yu wrote:
>> Hi all,
>>
>> This reminds me of a previous issue I think that we were discussing.
>> @John Roesler <ma...@apache.org> I think you should remember this one.
>>
>> A while back, we were talking about having suppress operator emit 
>> records by wall-clock time instead of stream time.
>> If we are adding this, wouldn't that make it more feasible for us to 
>> implement that feature for suppression?
>>
>> If I recall correctly, there actually had been quite a bit of user 
>> demand for such a feature.
>> Might be good to include it in this KIP (or maybe use one of the prior 
>> KIPs for this feature).
>>
>> Best,
>> Richard
>>
>> On Sat, Jul 4, 2020 at 6:58 AM John Roesler <vv...@apache.org> wrote:
>>> Hi all,
>>>
>>>  1. Thanks, Boyang, it is nice to see usage examples in KIPs like this. It helps during the discussion, and it’s also good documentation later on. 
>>>
>>>  2. Yeah, this is a subtle point. The motivation mentions being able to control the time during tests, but to be able to make it work, the processor implementation needs a public method on ProcessorContext to get the time. Otherwise, processors would have to check the type of the context and cast, depending on whether they’re running inside a test or not. In retrospect, if we’d had a usage example, this probably would have been clear. 
>>>
>>>  3. I don’t think we expect people to have their own implementations of ProcessorContext. Since all implementations are internal, it’s really an implementation detail whether we use a default method, abstract methods, or concrete methods. I can’t think of an implementation that really wants to just look up the system time. In the production code path, we cache the time for performance, and in testing, we use a mock time. 
>>>
>>>  Thanks,
>>>  John
>>>
>>>
>>>  On Fri, Jul 3, 2020, at 06:41, Piotr Smoliński wrote:
>>>  > 1. Makes sense; let me propose something
>>>  > 
>>>  > 2. That's not testing-only. The goal is to use the same API to access 
>>>  > the time
>>>  > in deployment and testing environments. The major driver is 
>>>  > System.currentTimeMillis(),
>>>  > which a) cannot be used in tests b) could go in specific cases back c) 
>>>  > is not compatible
>>>  > with punctuator call. The idea is that we could access clock using 
>>>  > uniform API. 
>>>  > For completness we should have same API for system and stream time.
>>>  > 
>>>  > 3. There aren't that many subclasses. Two important ones are 
>>>  > ProcessorContextImpl and 
>>>  > MockProcessorContext (and third one: 
>>>  > ForwardingDisableProcessorContext). If given
>>>  > implementation does not support schedule() call, there is no reason to 
>>>  > support clock access. 
>>>  > The default implementation should just throw 
>>>  > UnsupportedOperationException just to prevent
>>>  > from compilation errors in possible subclasses.
>>>  > 
>>>  > On 2020/07/01 02:24:43, Boyang Chen <re...@gmail.com> wrote: 
>>>  > > Thanks Will for the KIP. A couple questions and suggestions:
>>>  > > 
>>>  > > 1. I think for new APIs to make most sense, we should add a minimal example
>>>  > > demonstrating how it could be useful to structure unit tests w/o the new
>>>  > > APIs.
>>>  > > 2. If this is a testing-only feature, could we only add it
>>>  > > to MockProcessorContext?
>>>  > > 3. Regarding the API, since this will be added to the ProcessorContext with
>>>  > > many subclasses, does it make sense to provide default implementations as
>>>  > > well?
>>>  > > 
>>>  > > Boyang
>>>  > > 
>>>  > > On Tue, Jun 30, 2020 at 6:56 PM William Bottrell <bo...@gmail.com>
>>>  > > wrote:
>>>  > > 
>>>  > > > Thanks, John! I made the change. How much longer should I let there be
>>>  > > > discussion before starting a VOTE?
>>>  > > >
>>>  > > > On Sat, Jun 27, 2020 at 6:50 AM John Roesler <vv...@apache.org> wrote:
>>>  > > >
>>>  > > > > Thanks, Will,
>>>  > > > >
>>>  > > > > That looks good to me. I would only add "cached" or something
>>>  > > > > to indicate that it wouldn't just transparently look up the current
>>>  > > > > System.currentTimeMillis every time.
>>>  > > > >
>>>  > > > > For example:
>>>  > > > > /**
>>>  > > > > * Returns current cached wall-clock system timestamp in milliseconds.
>>>  > > > > *
>>>  > > > > * @return the current cached wall-clock system timestamp in milliseconds
>>>  > > > > */
>>>  > > > > long currentSystemTimeMs();
>>>  > > > >
>>>  > > > > I don't want to give specific information about _when_ exactly the
>>>  > > > > timestamp cache will be updated, so that we can adjust it in the
>>>  > > > > future, but it does seem important to make people aware that they
>>>  > > > > won't see the timestamp advance during the execution of
>>>  > > > > Processor.process(), for example.
>>>  > > > >
>>>  > > > > With that modification, I'll be +1 on this proposal.
>>>  > > > >
>>>  > > > > Thanks again for the KIP!
>>>  > > > > -John
>>>  > > > >
>>>  > > > > On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:
>>>  > > > > > Thanks, John! I appreciate you adjusting my lingo. I made the change to
>>>  > > > > the
>>>  > > > > > KIP. I will add the note about system time to the javadoc.
>>>  > > > > >
>>>  > > > > > On Wed, Jun 24, 2020 at 6:52 PM John Roesler <vv...@apache.org>
>>>  > > > > wrote:
>>>  > > > > >
>>>  > > > > > > Hi Will,
>>>  > > > > > >
>>>  > > > > > > This proposal looks good to me overall. Thanks for the contribution!
>>>  > > > > > >
>>>  > > > > > > Just a couple of minor notes:
>>>  > > > > > >
>>>  > > > > > > The system time method would return a cached timestamp that Streams
>>>  > > > > looks
>>>  > > > > > > up once when it starts processing a record. This may be confusing, so
>>>  > > > > it
>>>  > > > > > > might be good to state it in the javadoc.
>>>  > > > > > >
>>>  > > > > > > I thought the javadoc for the stream time might be a bit confusing.
>>>  > > > We
>>>  > > > > > > normally talk about “Tasks” not “partition groups” in the public api.
>>>  > > > > Maybe
>>>  > > > > > > just saying that it’s “the maximum timestamp of any record yet
>>>  > > > > processed by
>>>  > > > > > > the task” would be both high level and accurate.
>>>  > > > > > >
>>>  > > > > > > Thanks again!
>>>  > > > > > > -John
>>>  > > > > > >
>>>  > > > > > > On Mon, Jun 22, 2020, at 02:10, William Bottrell wrote:
>>>  > > > > > > > Thanks, Bruno. I updated the KIP, so hopefully it makes more sense.
>>>  > > > > > > Thanks
>>>  > > > > > > > to Matthias J. Sax and Piotr Smolinski for helping with details.
>>>  > > > > > > >
>>>  > > > > > > > I welcome more feedback. Let me know if something doesn't make
>>>  > > > sense
>>>  > > > > or I
>>>  > > > > > > > need to provide more detail. Also, feel free to enlighten me.
>>>  > > > Thanks!
>>>  > > > > > > >
>>>  > > > > > > > On Thu, Jun 11, 2020 at 1:11 PM Bruno Cadonna <br...@confluent.io>
>>>  > > > > > > wrote:
>>>  > > > > > > >
>>>  > > > > > > > > Hi Will,
>>>  > > > > > > > >
>>>  > > > > > > > > Thank you for the KIP.
>>>  > > > > > > > >
>>>  > > > > > > > > 1. Could you elaborate a bit more on the motivation in the KIP?
>>>  > > > An
>>>  > > > > > > > > example would make the motivation clearer.
>>>  > > > > > > > >
>>>  > > > > > > > > 2. In section "Proposed Changes" you do not need to show the
>>>  > > > > > > > > implementation and describe internals. A description of the
>>>  > > > > expected
>>>  > > > > > > > > behavior of the newly added methods should suffice.
>>>  > > > > > > > >
>>>  > > > > > > > > 3. In "Compatibility, Deprecation, and Migration Plan" you should
>>>  > > > > > > > > state that the change is backward compatible because the two
>>>  > > > > methods
>>>  > > > > > > > > will be added and no other method will be changed or removed.
>>>  > > > > > > > >
>>>  > > > > > > > > Best,
>>>  > > > > > > > > Bruno
>>>  > > > > > > > >
>>>  > > > > > > > > On Wed, Jun 10, 2020 at 10:06 AM William Bottrell <
>>>  > > > > bottrellw@gmail.com
>>>  > > > > > > >
>>>  > > > > > > > > wrote:
>>>  > > > > > > > > >
>>>  > > > > > > > > > Add currentSystemTimeMs and currentStreamTimeMs to
>>>  > > > > ProcessorContext
>>>  > > > > > > > > > <
>>>  > > > > > > > >
>>>  > > > > > >
>>>  > > > >
>>>  > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext
>>>  > > > > > > > > >
>>>  > > > > > > > > >
>>>  > > > > > > > > > I am extremely new to Kafka, but thank you to John Roesler and
>>>  > > > > > > Matthias
>>>  > > > > > > > > J.
>>>  > > > > > > > > > Sax for pointing me in the right direction. I accept any and
>>>  > > > all
>>>  > > > > > > > > feedback.
>>>  > > > > > > > > >
>>>  > > > > > > > > > Thanks,
>>>  > > > > > > > > > Will
>>>  > > > > > > > >
>>>  > > > > > > >
>>>  > > > > > >
>>>  > > > > >
>>>  > > > >
>>>  > > >
>>>  > > 
>>>  >


Re: [DISCUSS] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

Posted by John Roesler <vv...@apache.org>.
Hi Richard,

It’s good to hear from you!

Thanks for bringing up the wall-clock suppression feature. IIRC, someone actually started a KIP discussion for it already, but I don’t think it went to a vote. I don’t recall any technical impediment, just the lack of availability to finish it up. Although there is some association, it would be good to keep the KIPs separate.

Thanks,
John

On Sat, Jul 4, 2020, at 10:05, Richard Yu wrote:
> Hi all,
> 
> This reminds me of a previous issue I think that we were discussing.
> @John Roesler <ma...@apache.org> I think you should remember this one.
> 
> A while back, we were talking about having suppress operator emit 
> records by wall-clock time instead of stream time.
> If we are adding this, wouldn't that make it more feasible for us to 
> implement that feature for suppression?
> 
> If I recall correctly, there actually had been quite a bit of user 
> demand for such a feature.
> Might be good to include it in this KIP (or maybe use one of the prior 
> KIPs for this feature).
> 
> Best,
> Richard
> 
> On Sat, Jul 4, 2020 at 6:58 AM John Roesler <vv...@apache.org> wrote:
> > Hi all,
> > 
> >  1. Thanks, Boyang, it is nice to see usage examples in KIPs like this. It helps during the discussion, and it’s also good documentation later on. 
> > 
> >  2. Yeah, this is a subtle point. The motivation mentions being able to control the time during tests, but to be able to make it work, the processor implementation needs a public method on ProcessorContext to get the time. Otherwise, processors would have to check the type of the context and cast, depending on whether they’re running inside a test or not. In retrospect, if we’d had a usage example, this probably would have been clear. 
> > 
> >  3. I don’t think we expect people to have their own implementations of ProcessorContext. Since all implementations are internal, it’s really an implementation detail whether we use a default method, abstract methods, or concrete methods. I can’t think of an implementation that really wants to just look up the system time. In the production code path, we cache the time for performance, and in testing, we use a mock time. 
> > 
> >  Thanks,
> >  John
> > 
> > 
> >  On Fri, Jul 3, 2020, at 06:41, Piotr Smoliński wrote:
> >  > 1. Makes sense; let me propose something
> >  > 
> >  > 2. That's not testing-only. The goal is to use the same API to access 
> >  > the time
> >  > in deployment and testing environments. The major driver is 
> >  > System.currentTimeMillis(),
> >  > which a) cannot be used in tests b) could go in specific cases back c) 
> >  > is not compatible
> >  > with punctuator call. The idea is that we could access clock using 
> >  > uniform API. 
> >  > For completness we should have same API for system and stream time.
> >  > 
> >  > 3. There aren't that many subclasses. Two important ones are 
> >  > ProcessorContextImpl and 
> >  > MockProcessorContext (and third one: 
> >  > ForwardingDisableProcessorContext). If given
> >  > implementation does not support schedule() call, there is no reason to 
> >  > support clock access. 
> >  > The default implementation should just throw 
> >  > UnsupportedOperationException just to prevent
> >  > from compilation errors in possible subclasses.
> >  > 
> >  > On 2020/07/01 02:24:43, Boyang Chen <re...@gmail.com> wrote: 
> >  > > Thanks Will for the KIP. A couple questions and suggestions:
> >  > > 
> >  > > 1. I think for new APIs to make most sense, we should add a minimal example
> >  > > demonstrating how it could be useful to structure unit tests w/o the new
> >  > > APIs.
> >  > > 2. If this is a testing-only feature, could we only add it
> >  > > to MockProcessorContext?
> >  > > 3. Regarding the API, since this will be added to the ProcessorContext with
> >  > > many subclasses, does it make sense to provide default implementations as
> >  > > well?
> >  > > 
> >  > > Boyang
> >  > > 
> >  > > On Tue, Jun 30, 2020 at 6:56 PM William Bottrell <bo...@gmail.com>
> >  > > wrote:
> >  > > 
> >  > > > Thanks, John! I made the change. How much longer should I let there be
> >  > > > discussion before starting a VOTE?
> >  > > >
> >  > > > On Sat, Jun 27, 2020 at 6:50 AM John Roesler <vv...@apache.org> wrote:
> >  > > >
> >  > > > > Thanks, Will,
> >  > > > >
> >  > > > > That looks good to me. I would only add "cached" or something
> >  > > > > to indicate that it wouldn't just transparently look up the current
> >  > > > > System.currentTimeMillis every time.
> >  > > > >
> >  > > > > For example:
> >  > > > > /**
> >  > > > > * Returns current cached wall-clock system timestamp in milliseconds.
> >  > > > > *
> >  > > > > * @return the current cached wall-clock system timestamp in milliseconds
> >  > > > > */
> >  > > > > long currentSystemTimeMs();
> >  > > > >
> >  > > > > I don't want to give specific information about _when_ exactly the
> >  > > > > timestamp cache will be updated, so that we can adjust it in the
> >  > > > > future, but it does seem important to make people aware that they
> >  > > > > won't see the timestamp advance during the execution of
> >  > > > > Processor.process(), for example.
> >  > > > >
> >  > > > > With that modification, I'll be +1 on this proposal.
> >  > > > >
> >  > > > > Thanks again for the KIP!
> >  > > > > -John
> >  > > > >
> >  > > > > On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:
> >  > > > > > Thanks, John! I appreciate you adjusting my lingo. I made the change to
> >  > > > > the
> >  > > > > > KIP. I will add the note about system time to the javadoc.
> >  > > > > >
> >  > > > > > On Wed, Jun 24, 2020 at 6:52 PM John Roesler <vv...@apache.org>
> >  > > > > wrote:
> >  > > > > >
> >  > > > > > > Hi Will,
> >  > > > > > >
> >  > > > > > > This proposal looks good to me overall. Thanks for the contribution!
> >  > > > > > >
> >  > > > > > > Just a couple of minor notes:
> >  > > > > > >
> >  > > > > > > The system time method would return a cached timestamp that Streams
> >  > > > > looks
> >  > > > > > > up once when it starts processing a record. This may be confusing, so
> >  > > > > it
> >  > > > > > > might be good to state it in the javadoc.
> >  > > > > > >
> >  > > > > > > I thought the javadoc for the stream time might be a bit confusing.
> >  > > > We
> >  > > > > > > normally talk about “Tasks” not “partition groups” in the public api.
> >  > > > > Maybe
> >  > > > > > > just saying that it’s “the maximum timestamp of any record yet
> >  > > > > processed by
> >  > > > > > > the task” would be both high level and accurate.
> >  > > > > > >
> >  > > > > > > Thanks again!
> >  > > > > > > -John
> >  > > > > > >
> >  > > > > > > On Mon, Jun 22, 2020, at 02:10, William Bottrell wrote:
> >  > > > > > > > Thanks, Bruno. I updated the KIP, so hopefully it makes more sense.
> >  > > > > > > Thanks
> >  > > > > > > > to Matthias J. Sax and Piotr Smolinski for helping with details.
> >  > > > > > > >
> >  > > > > > > > I welcome more feedback. Let me know if something doesn't make
> >  > > > sense
> >  > > > > or I
> >  > > > > > > > need to provide more detail. Also, feel free to enlighten me.
> >  > > > Thanks!
> >  > > > > > > >
> >  > > > > > > > On Thu, Jun 11, 2020 at 1:11 PM Bruno Cadonna <br...@confluent.io>
> >  > > > > > > wrote:
> >  > > > > > > >
> >  > > > > > > > > Hi Will,
> >  > > > > > > > >
> >  > > > > > > > > Thank you for the KIP.
> >  > > > > > > > >
> >  > > > > > > > > 1. Could you elaborate a bit more on the motivation in the KIP?
> >  > > > An
> >  > > > > > > > > example would make the motivation clearer.
> >  > > > > > > > >
> >  > > > > > > > > 2. In section "Proposed Changes" you do not need to show the
> >  > > > > > > > > implementation and describe internals. A description of the
> >  > > > > expected
> >  > > > > > > > > behavior of the newly added methods should suffice.
> >  > > > > > > > >
> >  > > > > > > > > 3. In "Compatibility, Deprecation, and Migration Plan" you should
> >  > > > > > > > > state that the change is backward compatible because the two
> >  > > > > methods
> >  > > > > > > > > will be added and no other method will be changed or removed.
> >  > > > > > > > >
> >  > > > > > > > > Best,
> >  > > > > > > > > Bruno
> >  > > > > > > > >
> >  > > > > > > > > On Wed, Jun 10, 2020 at 10:06 AM William Bottrell <
> >  > > > > bottrellw@gmail.com
> >  > > > > > > >
> >  > > > > > > > > wrote:
> >  > > > > > > > > >
> >  > > > > > > > > > Add currentSystemTimeMs and currentStreamTimeMs to
> >  > > > > ProcessorContext
> >  > > > > > > > > > <
> >  > > > > > > > >
> >  > > > > > >
> >  > > > >
> >  > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext
> >  > > > > > > > > >
> >  > > > > > > > > >
> >  > > > > > > > > > I am extremely new to Kafka, but thank you to John Roesler and
> >  > > > > > > Matthias
> >  > > > > > > > > J.
> >  > > > > > > > > > Sax for pointing me in the right direction. I accept any and
> >  > > > all
> >  > > > > > > > > feedback.
> >  > > > > > > > > >
> >  > > > > > > > > > Thanks,
> >  > > > > > > > > > Will
> >  > > > > > > > >
> >  > > > > > > >
> >  > > > > > >
> >  > > > > >
> >  > > > >
> >  > > >
> >  > > 
> >  >

Re: [DISCUSS] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

Posted by Richard Yu <yo...@gmail.com>.
Hi all,

This reminds me of a previous issue I think that we were discussing.
@John Roesler <vv...@apache.org> I think you should remember this one.

A while back, we were talking about having suppress operator emit records
by wall-clock time instead of stream time.
If we are adding this, wouldn't that make it more feasible for us to
implement that feature for suppression?

If I recall correctly, there actually had been quite a bit of user demand
for such a feature.
Might be good to include it in this KIP (or maybe use one of the prior KIPs
for this feature).

Best,
Richard

On Sat, Jul 4, 2020 at 6:58 AM John Roesler <vv...@apache.org> wrote:

> Hi all,
>
> 1. Thanks, Boyang, it is nice to see usage examples in KIPs like this. It
> helps during the discussion, and it’s also good documentation later on.
>
> 2. Yeah, this is a subtle point. The motivation mentions being able to
> control the  time during tests, but to be able to make it work, the
> processor implementation needs a public method on ProcessorContext to get
> the time. Otherwise, processors would have to check the type of the context
> and cast, depending on whether they’re running inside a test or not. In
> retrospect, if we’d had a usage example, this probably would have been
> clear.
>
> 3. I don’t think we expect people to have their own implementations of
> ProcessorContext. Since all implementations are internal, it’s really an
> implementation detail whether we use a default method, abstract methods, or
> concrete methods. I can’t think of an implementation that really wants to
> just look up the system time. In the production code path, we cache the
> time for performance, and in testing, we use a mock time.
>
> Thanks,
> John
>
>
> On Fri, Jul 3, 2020, at 06:41, Piotr Smoliński wrote:
> > 1. Makes sense; let me propose something
> >
> > 2. That's not testing-only. The goal is to use the same API to access
> > the time
> > in deployment and testing environments. The major driver is
> > System.currentTimeMillis(),
> > which a) cannot be used in tests b) could go in specific cases back c)
> > is not compatible
> > with punctuator call. The idea is that we could access clock using
> > uniform API.
> > For completness we should have same API for system and stream time.
> >
> > 3. There aren't that many subclasses. Two important ones are
> > ProcessorContextImpl and
> > MockProcessorContext (and third one:
> > ForwardingDisableProcessorContext). If given
> > implementation does not support schedule() call, there is no reason to
> > support clock access.
> > The default implementation should just throw
> > UnsupportedOperationException just to prevent
> > from compilation errors in possible subclasses.
> >
> > On 2020/07/01 02:24:43, Boyang Chen <re...@gmail.com> wrote:
> > > Thanks Will for the KIP. A couple questions and suggestions:
> > >
> > > 1. I think for new APIs to make most sense, we should add a minimal
> example
> > > demonstrating how it could be useful to structure unit tests w/o the
> new
> > > APIs.
> > > 2. If this is a testing-only feature, could we only add it
> > > to MockProcessorContext?
> > > 3. Regarding the API, since this will be added to the ProcessorContext
> with
> > > many subclasses, does it make sense to provide default implementations
> as
> > > well?
> > >
> > > Boyang
> > >
> > > On Tue, Jun 30, 2020 at 6:56 PM William Bottrell <bo...@gmail.com>
> > > wrote:
> > >
> > > > Thanks, John! I made the change. How much longer should I let there
> be
> > > > discussion before starting a VOTE?
> > > >
> > > > On Sat, Jun 27, 2020 at 6:50 AM John Roesler <vv...@apache.org>
> wrote:
> > > >
> > > > > Thanks, Will,
> > > > >
> > > > > That looks good to me. I would only add "cached" or something
> > > > > to indicate that it wouldn't just transparently look up the current
> > > > > System.currentTimeMillis every time.
> > > > >
> > > > > For example:
> > > > > /**
> > > > >  * Returns current cached wall-clock system timestamp in
> milliseconds.
> > > > >  *
> > > > >  * @return the current cached wall-clock system timestamp in
> milliseconds
> > > > >  */
> > > > > long currentSystemTimeMs();
> > > > >
> > > > > I don't want to give specific information about _when_ exactly the
> > > > > timestamp cache will be updated, so that we can adjust it in the
> > > > > future, but it does seem important to make people aware that they
> > > > > won't see the timestamp advance during the execution of
> > > > > Processor.process(), for example.
> > > > >
> > > > > With that modification, I'll be +1 on this proposal.
> > > > >
> > > > > Thanks again for the KIP!
> > > > > -John
> > > > >
> > > > > On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:
> > > > > > Thanks, John! I appreciate you adjusting my lingo. I made the
> change to
> > > > > the
> > > > > > KIP. I will add the note about system time to the javadoc.
> > > > > >
> > > > > > On Wed, Jun 24, 2020 at 6:52 PM John Roesler <
> vvcephei@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Will,
> > > > > > >
> > > > > > > This proposal looks good to me overall. Thanks for the
> contribution!
> > > > > > >
> > > > > > > Just a couple of minor notes:
> > > > > > >
> > > > > > > The system time method would return a cached timestamp that
> Streams
> > > > > looks
> > > > > > > up once when it starts processing a record. This may be
> confusing, so
> > > > > it
> > > > > > > might be good to state it in the javadoc.
> > > > > > >
> > > > > > > I thought the javadoc for the stream time might be a bit
> confusing.
> > > > We
> > > > > > > normally talk about “Tasks” not “partition groups” in the
> public api.
> > > > > Maybe
> > > > > > > just saying that it’s “the maximum timestamp of any record yet
> > > > > processed by
> > > > > > > the task” would be both high level and accurate.
> > > > > > >
> > > > > > > Thanks again!
> > > > > > > -John
> > > > > > >
> > > > > > > On Mon, Jun 22, 2020, at 02:10, William Bottrell wrote:
> > > > > > > > Thanks, Bruno. I updated the KIP, so hopefully it makes more
> sense.
> > > > > > > Thanks
> > > > > > > > to Matthias J. Sax and Piotr Smolinski for helping with
> details.
> > > > > > > >
> > > > > > > > I welcome more feedback. Let me know if something doesn't
> make
> > > > sense
> > > > > or I
> > > > > > > > need to provide more detail. Also, feel free to enlighten me.
> > > > Thanks!
> > > > > > > >
> > > > > > > > On Thu, Jun 11, 2020 at 1:11 PM Bruno Cadonna <
> bruno@confluent.io>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Will,
> > > > > > > > >
> > > > > > > > > Thank you for the KIP.
> > > > > > > > >
> > > > > > > > > 1. Could you elaborate a bit more on the motivation in the
> KIP?
> > > > An
> > > > > > > > > example would make the motivation clearer.
> > > > > > > > >
> > > > > > > > > 2. In section "Proposed Changes" you do not need to show
> the
> > > > > > > > > implementation and describe internals. A description of the
> > > > > expected
> > > > > > > > > behavior of the newly added methods should suffice.
> > > > > > > > >
> > > > > > > > > 3. In "Compatibility, Deprecation, and Migration Plan" you
> should
> > > > > > > > > state that the change is backward compatible because the
> two
> > > > > methods
> > > > > > > > > will be added and no other method will be changed or
> removed.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Bruno
> > > > > > > > >
> > > > > > > > > On Wed, Jun 10, 2020 at 10:06 AM William Bottrell <
> > > > > bottrellw@gmail.com
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Add currentSystemTimeMs and currentStreamTimeMs to
> > > > > ProcessorContext
> > > > > > > > > > <
> > > > > > > > >
> > > > > > >
> > > > >
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > I am extremely new to Kafka, but thank you to John
> Roesler and
> > > > > > > Matthias
> > > > > > > > > J.
> > > > > > > > > > Sax for pointing me in the right direction. I accept any
> and
> > > > all
> > > > > > > > > feedback.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Will
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

Posted by John Roesler <vv...@apache.org>.
Hi all,

1. Thanks, Boyang, it is nice to see usage examples in KIPs like this. It helps during the discussion, and it’s also good documentation later on. 

2. Yeah, this is a subtle point. The motivation mentions being able to control the  time during tests, but to be able to make it work, the processor implementation needs a public method on ProcessorContext to get the time. Otherwise, processors would have to check the type of the context and cast, depending on whether they’re running inside a test or not. In retrospect, if we’d had a usage example, this probably would have been clear. 

3. I don’t think we expect people to have their own implementations of ProcessorContext. Since all implementations are internal, it’s really an implementation detail whether we use a default method, abstract methods, or concrete methods. I can’t think of an implementation that really wants to just look up the system time. In the production code path, we cache the time for performance, and in testing, we use a mock time. 

Thanks,
John


On Fri, Jul 3, 2020, at 06:41, Piotr Smoliński wrote:
> 1. Makes sense; let me propose something
> 
> 2. That's not testing-only. The goal is to use the same API to access 
> the time
> in deployment and testing environments. The major driver is 
> System.currentTimeMillis(),
> which a) cannot be used in tests b) could go in specific cases back c) 
> is not compatible
> with punctuator call. The idea is that we could access clock using 
> uniform API. 
> For completness we should have same API for system and stream time.
> 
> 3. There aren't that many subclasses. Two important ones are 
> ProcessorContextImpl and 
> MockProcessorContext (and third one: 
> ForwardingDisableProcessorContext). If given
> implementation does not support schedule() call, there is no reason to 
> support clock access. 
> The default implementation should just throw 
> UnsupportedOperationException just to prevent
> from compilation errors in possible subclasses.
> 
> On 2020/07/01 02:24:43, Boyang Chen <re...@gmail.com> wrote: 
> > Thanks Will for the KIP. A couple questions and suggestions:
> > 
> > 1. I think for new APIs to make most sense, we should add a minimal example
> > demonstrating how it could be useful to structure unit tests w/o the new
> > APIs.
> > 2. If this is a testing-only feature, could we only add it
> > to MockProcessorContext?
> > 3. Regarding the API, since this will be added to the ProcessorContext with
> > many subclasses, does it make sense to provide default implementations as
> > well?
> > 
> > Boyang
> > 
> > On Tue, Jun 30, 2020 at 6:56 PM William Bottrell <bo...@gmail.com>
> > wrote:
> > 
> > > Thanks, John! I made the change. How much longer should I let there be
> > > discussion before starting a VOTE?
> > >
> > > On Sat, Jun 27, 2020 at 6:50 AM John Roesler <vv...@apache.org> wrote:
> > >
> > > > Thanks, Will,
> > > >
> > > > That looks good to me. I would only add "cached" or something
> > > > to indicate that it wouldn't just transparently look up the current
> > > > System.currentTimeMillis every time.
> > > >
> > > > For example:
> > > > /**
> > > >  * Returns current cached wall-clock system timestamp in milliseconds.
> > > >  *
> > > >  * @return the current cached wall-clock system timestamp in milliseconds
> > > >  */
> > > > long currentSystemTimeMs();
> > > >
> > > > I don't want to give specific information about _when_ exactly the
> > > > timestamp cache will be updated, so that we can adjust it in the
> > > > future, but it does seem important to make people aware that they
> > > > won't see the timestamp advance during the execution of
> > > > Processor.process(), for example.
> > > >
> > > > With that modification, I'll be +1 on this proposal.
> > > >
> > > > Thanks again for the KIP!
> > > > -John
> > > >
> > > > On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:
> > > > > Thanks, John! I appreciate you adjusting my lingo. I made the change to
> > > > the
> > > > > KIP. I will add the note about system time to the javadoc.
> > > > >
> > > > > On Wed, Jun 24, 2020 at 6:52 PM John Roesler <vv...@apache.org>
> > > > wrote:
> > > > >
> > > > > > Hi Will,
> > > > > >
> > > > > > This proposal looks good to me overall. Thanks for the contribution!
> > > > > >
> > > > > > Just a couple of minor notes:
> > > > > >
> > > > > > The system time method would return a cached timestamp that Streams
> > > > looks
> > > > > > up once when it starts processing a record. This may be confusing, so
> > > > it
> > > > > > might be good to state it in the javadoc.
> > > > > >
> > > > > > I thought the javadoc for the stream time might be a bit confusing.
> > > We
> > > > > > normally talk about “Tasks” not “partition groups” in the public api.
> > > > Maybe
> > > > > > just saying that it’s “the maximum timestamp of any record yet
> > > > processed by
> > > > > > the task” would be both high level and accurate.
> > > > > >
> > > > > > Thanks again!
> > > > > > -John
> > > > > >
> > > > > > On Mon, Jun 22, 2020, at 02:10, William Bottrell wrote:
> > > > > > > Thanks, Bruno. I updated the KIP, so hopefully it makes more sense.
> > > > > > Thanks
> > > > > > > to Matthias J. Sax and Piotr Smolinski for helping with details.
> > > > > > >
> > > > > > > I welcome more feedback. Let me know if something doesn't make
> > > sense
> > > > or I
> > > > > > > need to provide more detail. Also, feel free to enlighten me.
> > > Thanks!
> > > > > > >
> > > > > > > On Thu, Jun 11, 2020 at 1:11 PM Bruno Cadonna <br...@confluent.io>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Will,
> > > > > > > >
> > > > > > > > Thank you for the KIP.
> > > > > > > >
> > > > > > > > 1. Could you elaborate a bit more on the motivation in the KIP?
> > > An
> > > > > > > > example would make the motivation clearer.
> > > > > > > >
> > > > > > > > 2. In section "Proposed Changes" you do not need to show the
> > > > > > > > implementation and describe internals. A description of the
> > > > expected
> > > > > > > > behavior of the newly added methods should suffice.
> > > > > > > >
> > > > > > > > 3. In "Compatibility, Deprecation, and Migration Plan" you should
> > > > > > > > state that the change is backward compatible because the two
> > > > methods
> > > > > > > > will be added and no other method will be changed or removed.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Bruno
> > > > > > > >
> > > > > > > > On Wed, Jun 10, 2020 at 10:06 AM William Bottrell <
> > > > bottrellw@gmail.com
> > > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > Add currentSystemTimeMs and currentStreamTimeMs to
> > > > ProcessorContext
> > > > > > > > > <
> > > > > > > >
> > > > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > I am extremely new to Kafka, but thank you to John Roesler and
> > > > > > Matthias
> > > > > > > > J.
> > > > > > > > > Sax for pointing me in the right direction. I accept any and
> > > all
> > > > > > > > feedback.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Will
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > 
>

Re: [DISCUSS] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

Posted by Piotr Smoli��ski <pi...@gmail.com>.
1. Makes sense; let me propose something

2. That's not testing-only. The goal is to use the same API to access the time
in deployment and testing environments. The major driver is System.currentTimeMillis(),
which a) cannot be used in tests b) could go in specific cases back c) is not compatible
with punctuator call. The idea is that we could access clock using uniform API. 
For completness we should have same API for system and stream time.

3. There aren't that many subclasses. Two important ones are ProcessorContextImpl and 
MockProcessorContext (and third one: ForwardingDisableProcessorContext). If given
implementation does not support schedule() call, there is no reason to support clock access. 
The default implementation should just throw UnsupportedOperationException just to prevent
from compilation errors in possible subclasses.

On 2020/07/01 02:24:43, Boyang Chen <re...@gmail.com> wrote: 
> Thanks Will for the KIP. A couple questions and suggestions:
> 
> 1. I think for new APIs to make most sense, we should add a minimal example
> demonstrating how it could be useful to structure unit tests w/o the new
> APIs.
> 2. If this is a testing-only feature, could we only add it
> to MockProcessorContext?
> 3. Regarding the API, since this will be added to the ProcessorContext with
> many subclasses, does it make sense to provide default implementations as
> well?
> 
> Boyang
> 
> On Tue, Jun 30, 2020 at 6:56 PM William Bottrell <bo...@gmail.com>
> wrote:
> 
> > Thanks, John! I made the change. How much longer should I let there be
> > discussion before starting a VOTE?
> >
> > On Sat, Jun 27, 2020 at 6:50 AM John Roesler <vv...@apache.org> wrote:
> >
> > > Thanks, Will,
> > >
> > > That looks good to me. I would only add "cached" or something
> > > to indicate that it wouldn't just transparently look up the current
> > > System.currentTimeMillis every time.
> > >
> > > For example:
> > > /**
> > >  * Returns current cached wall-clock system timestamp in milliseconds.
> > >  *
> > >  * @return the current cached wall-clock system timestamp in milliseconds
> > >  */
> > > long currentSystemTimeMs();
> > >
> > > I don't want to give specific information about _when_ exactly the
> > > timestamp cache will be updated, so that we can adjust it in the
> > > future, but it does seem important to make people aware that they
> > > won't see the timestamp advance during the execution of
> > > Processor.process(), for example.
> > >
> > > With that modification, I'll be +1 on this proposal.
> > >
> > > Thanks again for the KIP!
> > > -John
> > >
> > > On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:
> > > > Thanks, John! I appreciate you adjusting my lingo. I made the change to
> > > the
> > > > KIP. I will add the note about system time to the javadoc.
> > > >
> > > > On Wed, Jun 24, 2020 at 6:52 PM John Roesler <vv...@apache.org>
> > > wrote:
> > > >
> > > > > Hi Will,
> > > > >
> > > > > This proposal looks good to me overall. Thanks for the contribution!
> > > > >
> > > > > Just a couple of minor notes:
> > > > >
> > > > > The system time method would return a cached timestamp that Streams
> > > looks
> > > > > up once when it starts processing a record. This may be confusing, so
> > > it
> > > > > might be good to state it in the javadoc.
> > > > >
> > > > > I thought the javadoc for the stream time might be a bit confusing.
> > We
> > > > > normally talk about “Tasks” not “partition groups” in the public api.
> > > Maybe
> > > > > just saying that it’s “the maximum timestamp of any record yet
> > > processed by
> > > > > the task” would be both high level and accurate.
> > > > >
> > > > > Thanks again!
> > > > > -John
> > > > >
> > > > > On Mon, Jun 22, 2020, at 02:10, William Bottrell wrote:
> > > > > > Thanks, Bruno. I updated the KIP, so hopefully it makes more sense.
> > > > > Thanks
> > > > > > to Matthias J. Sax and Piotr Smolinski for helping with details.
> > > > > >
> > > > > > I welcome more feedback. Let me know if something doesn't make
> > sense
> > > or I
> > > > > > need to provide more detail. Also, feel free to enlighten me.
> > Thanks!
> > > > > >
> > > > > > On Thu, Jun 11, 2020 at 1:11 PM Bruno Cadonna <br...@confluent.io>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Will,
> > > > > > >
> > > > > > > Thank you for the KIP.
> > > > > > >
> > > > > > > 1. Could you elaborate a bit more on the motivation in the KIP?
> > An
> > > > > > > example would make the motivation clearer.
> > > > > > >
> > > > > > > 2. In section "Proposed Changes" you do not need to show the
> > > > > > > implementation and describe internals. A description of the
> > > expected
> > > > > > > behavior of the newly added methods should suffice.
> > > > > > >
> > > > > > > 3. In "Compatibility, Deprecation, and Migration Plan" you should
> > > > > > > state that the change is backward compatible because the two
> > > methods
> > > > > > > will be added and no other method will be changed or removed.
> > > > > > >
> > > > > > > Best,
> > > > > > > Bruno
> > > > > > >
> > > > > > > On Wed, Jun 10, 2020 at 10:06 AM William Bottrell <
> > > bottrellw@gmail.com
> > > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > Add currentSystemTimeMs and currentStreamTimeMs to
> > > ProcessorContext
> > > > > > > > <
> > > > > > >
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext
> > > > > > > >
> > > > > > > >
> > > > > > > > I am extremely new to Kafka, but thank you to John Roesler and
> > > > > Matthias
> > > > > > > J.
> > > > > > > > Sax for pointing me in the right direction. I accept any and
> > all
> > > > > > > feedback.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Will
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 

Re: [DISCUSS] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

Posted by Boyang Chen <re...@gmail.com>.
Thanks Will for the KIP. A couple questions and suggestions:

1. I think for new APIs to make most sense, we should add a minimal example
demonstrating how it could be useful to structure unit tests w/o the new
APIs.
2. If this is a testing-only feature, could we only add it
to MockProcessorContext?
3. Regarding the API, since this will be added to the ProcessorContext with
many subclasses, does it make sense to provide default implementations as
well?

Boyang

On Tue, Jun 30, 2020 at 6:56 PM William Bottrell <bo...@gmail.com>
wrote:

> Thanks, John! I made the change. How much longer should I let there be
> discussion before starting a VOTE?
>
> On Sat, Jun 27, 2020 at 6:50 AM John Roesler <vv...@apache.org> wrote:
>
> > Thanks, Will,
> >
> > That looks good to me. I would only add "cached" or something
> > to indicate that it wouldn't just transparently look up the current
> > System.currentTimeMillis every time.
> >
> > For example:
> > /**
> >  * Returns current cached wall-clock system timestamp in milliseconds.
> >  *
> >  * @return the current cached wall-clock system timestamp in milliseconds
> >  */
> > long currentSystemTimeMs();
> >
> > I don't want to give specific information about _when_ exactly the
> > timestamp cache will be updated, so that we can adjust it in the
> > future, but it does seem important to make people aware that they
> > won't see the timestamp advance during the execution of
> > Processor.process(), for example.
> >
> > With that modification, I'll be +1 on this proposal.
> >
> > Thanks again for the KIP!
> > -John
> >
> > On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:
> > > Thanks, John! I appreciate you adjusting my lingo. I made the change to
> > the
> > > KIP. I will add the note about system time to the javadoc.
> > >
> > > On Wed, Jun 24, 2020 at 6:52 PM John Roesler <vv...@apache.org>
> > wrote:
> > >
> > > > Hi Will,
> > > >
> > > > This proposal looks good to me overall. Thanks for the contribution!
> > > >
> > > > Just a couple of minor notes:
> > > >
> > > > The system time method would return a cached timestamp that Streams
> > looks
> > > > up once when it starts processing a record. This may be confusing, so
> > it
> > > > might be good to state it in the javadoc.
> > > >
> > > > I thought the javadoc for the stream time might be a bit confusing.
> We
> > > > normally talk about “Tasks” not “partition groups” in the public api.
> > Maybe
> > > > just saying that it’s “the maximum timestamp of any record yet
> > processed by
> > > > the task” would be both high level and accurate.
> > > >
> > > > Thanks again!
> > > > -John
> > > >
> > > > On Mon, Jun 22, 2020, at 02:10, William Bottrell wrote:
> > > > > Thanks, Bruno. I updated the KIP, so hopefully it makes more sense.
> > > > Thanks
> > > > > to Matthias J. Sax and Piotr Smolinski for helping with details.
> > > > >
> > > > > I welcome more feedback. Let me know if something doesn't make
> sense
> > or I
> > > > > need to provide more detail. Also, feel free to enlighten me.
> Thanks!
> > > > >
> > > > > On Thu, Jun 11, 2020 at 1:11 PM Bruno Cadonna <br...@confluent.io>
> > > > wrote:
> > > > >
> > > > > > Hi Will,
> > > > > >
> > > > > > Thank you for the KIP.
> > > > > >
> > > > > > 1. Could you elaborate a bit more on the motivation in the KIP?
> An
> > > > > > example would make the motivation clearer.
> > > > > >
> > > > > > 2. In section "Proposed Changes" you do not need to show the
> > > > > > implementation and describe internals. A description of the
> > expected
> > > > > > behavior of the newly added methods should suffice.
> > > > > >
> > > > > > 3. In "Compatibility, Deprecation, and Migration Plan" you should
> > > > > > state that the change is backward compatible because the two
> > methods
> > > > > > will be added and no other method will be changed or removed.
> > > > > >
> > > > > > Best,
> > > > > > Bruno
> > > > > >
> > > > > > On Wed, Jun 10, 2020 at 10:06 AM William Bottrell <
> > bottrellw@gmail.com
> > > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > Add currentSystemTimeMs and currentStreamTimeMs to
> > ProcessorContext
> > > > > > > <
> > > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext
> > > > > > >
> > > > > > >
> > > > > > > I am extremely new to Kafka, but thank you to John Roesler and
> > > > Matthias
> > > > > > J.
> > > > > > > Sax for pointing me in the right direction. I accept any and
> all
> > > > > > feedback.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Will
> > > > > >
> > > > >
> > > >
> > >
> >
>