You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Rohit Deshpande <ro...@gmail.com> on 2020/11/28 04:51:53 UTC

Re: [DISCUSS] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

Hi,
I would like to revive this KIP.
1. As per proposed solution, we want to add following method in ProcessorContext class
/**
 * Returns current cached wall-clock system timestamp in milliseconds.
 *
 * @return the current cached wall-clock system timestamp in milliseconds
 */
long currentSystemTimeMs();
but InternalProcessorContext class already contains same method: https://github.com/guozhangwang/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java#L54
Will it make more sense to get rid of this method from InternalProcessorContext and add it to ProcessorContext?
2. I am thinking of adding one test in TopologyDriverTest where using currentSystemTimeMs(), Processor will determine what to do with incoming record by comparing its timestamp with wall clock time. Similarly we can have
another test where we fetch streamTime and can take an action on incoming record.


Thanks,
Rohit


On 2020/08/14 05:07:04, "John Roesler" <v....@apache.org> wrote: 
> Thanks for the reply, Matthias,> 
> 
> I see what you mean. I suppose I was thinking that we would pass in the cached system time, which is also what we’re proposing to add to the ProcessorContext.> 
> 
> If you think there’s something about the timestamp extractor in particular that would make people want more precision, then something like Time would do the trick. Since it’s not a public API, maybe just ‘Supplier<Long>’ would be appropriate.> 
> 
> But I also don’t want to bikeshed it. My only concern was that it’s awkward to ask people to actually change their application code for testing. But maybe in this case, an option is better than no option, and if people don’t like it, we can always deprecate the mock extractor and evolve the interface later. > 
> 
> So, I’m +1 either way.> 
> 
> Thanks,> 
> John> 
> 
> On Mon, Aug 3, 2020, at 16:28, Matthias J. Sax wrote:> 
> > Interesting proposal.> 
> > > 
> > However, it raises the question how the runtime would pass in the> 
> > `systemTime` parameter? To be accurate, we would need to call> 
> > `Time.milliseconds()` each time before we call the timestamp extractor.> 
> > This sound expensive and maybe the extractor does not even use this value.> 
> > > 
> > Or we only call `Time.milliseconds()` periodically (as we also do in our> 
> > runtime code) to make it cheap, however, we loose precision? Not sure if> 
> > we can make this trade-off for the user?> 
> > > 
> > Handing in the `Time` object itself might be another idea, however it> 
> > seems "dangerous" though, as it does not seem to be actually public API?> 
> > > 
> > Last, do we really think we need this feature? We never had a feature> 
> > request for it and I am not aware of any issue with the current> 
> > TimestampExtractor interface.> 
> > > 
> > It's always easier to add it later if there is real demand instead of> 
> > pro-actively changing it (and maybe the need to deprecate and remove> 
> > later) with no clear benefit? Adding the `MockTimestampsExtractor` as> 
> > part of the test-utils package seems less "dangerous" and should do the> 
> > job, allowing us to collect feedback. If it's not good enough, we can> 
> > still change the TimestampExtractor interface as a follow up?> 
> > > 
> > > 
> > -Matthias> 
> > > 
> > On 7/28/20 10:03 AM, John Roesler wrote:> 
> > > Thanks Matthias,> 
> > > > 
> > > This is a really good point. It might be a bummer> 
> > > to have to actually change the topology between> 
> > > testing and production. Do you think we can rather> 
> > > evolve the TimestampExtractor interface to let> 
> > > Streams pass in the current system time, along with> 
> > > the current record and the current partition time?> 
> > > > 
> > > For example, we could add a new method:> 
> > > long extract(> 
> > >   ConsumerRecord<Object, Object> record, > 
> > >   long partitionTime,> 
> > >   long systemTime> 
> > > );> 
> > > > 
> > > Then, Streams could pass in the current system > 
> > > time and TopologyTestDriver could pass the mocked> 
> > > time. Additionally, users who implement> 
> > > TimestampExtractor on their own would be able to> 
> > > deterministically unit-test their own implementation.> 
> > > > 
> > > It's always a challenge to add to an interface without> 
> > > breaking compatibility. In this case, it seems like> 
> > > we could provide a default implementation that just> 
> > > ignores the systemTime argument and calls> 
> > > extract(record,  partitionTime) and also deprecate> 
> > > the existing method. Then custom implementations> 
> > > would get a deprecation warning telling them to> 
> > > implement the other method, and when we remove> 
> > > the deprecated extract(record, partitionTime), we can> 
> > > also drop the default implementation from the new> 
> > > method.> 
> > > > 
> > > Specifically, what do you think about:> 
> > > =================================> 
> > > public interface TimestampExtractor {> 
> > >     /*...> 
> > >      * @deprecated Since 2.7 Implement> 
> > >      *   {@code extract(ConsumerRecord<Object, Object> record, long partitionTime, long systemTime)} instead> 
> > >      */> 
> > >     @Deprecated> 
> > >     long extract(> 
> > >       ConsumerRecord<Object, Object> record,> 
> > >       long partitionTime> 
> > >     );> 
> > > > 
> > >     default long extract(> 
> > >       ConsumerRecord<Object, Object> record,> 
> > >       long partitionTime,> 
> > >       long systemTime) {> 
> > >         return extract(record, partitionTime);> 
> > >     }> 
> > > }> 
> > > =================================> 
> > > > 
> > > Thanks,> 
> > > -John> 
> > > > 
> > > On Sun, Jul 26, 2020, at 15:47, Matthias J. Sax wrote:> 
> > >> Hi,> 
> > >>> 
> > >> I just had one more thought about an additional improvement we might> 
> > >> want to include in this KIP.> 
> > >>> 
> > >> Kafka Streams ships with a `WallclockTimestampExtractor` that just> 
> > >> returns `System.currentTimeMillis()` and thus, cannot be mocked. And it> 
> > >> seems that there is no way for a user to build a custom timestamps> 
> > >> extractor that returns the TTD's mocked system time.> 
> > >>> 
> > >> Thus, it might be nice, to add a `MockTimeExtractor` (only in the> 
> > >> test-util package) that users could set and this `MockTimeExtractor`> 
> > >> returns the mocked system time.> 
> > >>> 
> > >> Thoughts?> 
> > >>> 
> > >>> 
> > >> -Matthias> 
> > >>> 
> > >> On 7/7/20 11:11 PM, Matthias J. Sax wrote:> 
> > >>> I think, we don't need a default implementation for the new methods.> 
> > >>>> 
> > >>> What would be the use-case to implement the  `ProcessorContext`> 
> > >>> interface? In contract to, for example, `KeyValueStore`,> 
> > >>> `ProcessorContext` is a use-only interface because it's never passed> 
> > >>> into Kafka Streams, but only handed out to the user.> 
> > >>>> 
> > >>>> 
> > >>> -Matthias> 
> > >>>> 
> > >>>> 
> > >>> On 7/7/20 1:28 PM, William Bottrell wrote:> 
> > >>>> Sure, I would appreciate help from Piotr creating an example.> 
> > >>>>> 
> > >>>> On Tue, Jul 7, 2020 at 12:03 PM Boyang Chen <re...@gmail.com>> 
> > >>>> wrote:> 
> > >>>>> 
> > >>>>> Hey John,> 
> > >>>>>> 
> > >>>>> since ProcessorContext is a public API, I couldn't be sure that people> 
> > >>>>> won't try to extend it. Without a default implementation, user code> 
> > >>>>> compilation will break.> 
> > >>>>>> 
> > >>>>> William and Piotr, it seems that we haven't added any example usage of the> 
> > >>>>> new API, could we try to address that? It should help with the motivation> 
> > >>>>> and follow-up meta comments as John proposed.> 
> > >>>>>> 
> > >>>>> Boyang> 
> > >>>>>> 
> > >>>>> On Mon, Jul 6, 2020 at 12:04 PM Matthias J. Sax <mj...@apache.org> wrote:> 
> > >>>>>> 
> > >>>>>> William,> 
> > >>>>>>> 
> > >>>>>> thanks for the KIP. LGMT. Feel free to start a vote.> 
> > >>>>>>> 
> > >>>>>>> 
> > >>>>>> -Matthias> 
> > >>>>>>> 
> > >>>>>>> 
> > >>>>>> On 7/4/20 10:14 AM, John Roesler wrote:> 
> > >>>>>>> Hi Richard,> 
> > >>>>>>>> 
> > >>>>>>> It’s good to hear from you!> 
> > >>>>>>>> 
> > >>>>>>> Thanks for bringing up the wall-clock suppression feature. IIRC,> 
> > >>>>> someone> 
> > >>>>>> actually started a KIP discussion for it already, but I don’t think it> 
> > >>>>> went> 
> > >>>>>> to a vote. I don’t recall any technical impediment, just the lack of> 
> > >>>>>> availability to finish it up. Although there is some association, it> 
> > >>>>> would> 
> > >>>>>> be good to keep the KIPs separate.> 
> > >>>>>>>> 
> > >>>>>>> Thanks,> 
> > >>>>>>> John> 
> > >>>>>>>> 
> > >>>>>>> On Sat, Jul 4, 2020, at 10:05, Richard Yu wrote:> 
> > >>>>>>>> Hi all,> 
> > >>>>>>>>> 
> > >>>>>>>> This reminds me of a previous issue I think that we were discussing.> 
> > >>>>>>>> @John Roesler <ma...@apache.org> I think you should> 
> > >>>>> remember> 
> > >>>>>> this one.> 
> > >>>>>>>>> 
> > >>>>>>>> A while back, we were talking about having suppress operator emit> 
> > >>>>>>>> records by wall-clock time instead of stream time.> 
> > >>>>>>>> If we are adding this, wouldn't that make it more feasible for us to> 
> > >>>>>>>> implement that feature for suppression?> 
> > >>>>>>>>> 
> > >>>>>>>> If I recall correctly, there actually had been quite a bit of user> 
> > >>>>>>>> demand for such a feature.> 
> > >>>>>>>> Might be good to include it in this KIP (or maybe use one of the prior> 
> > >>>>>>>> KIPs for this feature).> 
> > >>>>>>>>> 
> > >>>>>>>> Best,> 
> > >>>>>>>> Richard> 
> > >>>>>>>>> 
> > >>>>>>>> On Sat, Jul 4, 2020 at 6:58 AM John Roesler <vv...@apache.org>> 
> > >>>>>> wrote:> 
> > >>>>>>>>> Hi all,> 
> > >>>>>>>>>> 
> > >>>>>>>>>  1. Thanks, Boyang, it is nice to see usage examples in KIPs like> 
> > >>>>>> this. It helps during the discussion, and it’s also good documentation> 
> > >>>>>> later on.> 
> > >>>>>>>>>> 
> > >>>>>>>>>  2. Yeah, this is a subtle point. The motivation mentions being able> 
> > >>>>>> to control the time during tests, but to be able to make it work, the> 
> > >>>>>> processor implementation needs a public method on ProcessorContext to get> 
> > >>>>>> the time. Otherwise, processors would have to check the type of the> 
> > >>>>> context> 
> > >>>>>> and cast, depending on whether they’re running inside a test or not. In> 
> > >>>>>> retrospect, if we’d had a usage example, this probably would have been> 
> > >>>>>> clear.> 
> > >>>>>>>>>> 
> > >>>>>>>>>  3. I don’t think we expect people to have their own implementations> 
> > >>>>>> of ProcessorContext. Since all implementations are internal, it’s really> 
> > >>>>> an> 
> > >>>>>> implementation detail whether we use a default method, abstract methods,> 
> > >>>>> or> 
> > >>>>>> concrete methods. I can’t think of an implementation that really wants to> 
> > >>>>>> just look up the system time. In the production code path, we cache the> 
> > >>>>>> time for performance, and in testing, we use a mock time.> 
> > >>>>>>>>>> 
> > >>>>>>>>>  Thanks,> 
> > >>>>>>>>>  John> 
> > >>>>>>>>>> 
> > >>>>>>>>>> 
> > >>>>>>>>>  On Fri, Jul 3, 2020, at 06:41, Piotr Smoliński wrote:> 
> > >>>>>>>>>  > 1. Makes sense; let me propose something> 
> > >>>>>>>>>  >> 
> > >>>>>>>>>  > 2. That's not testing-only. The goal is to use the same API to> 
> > >>>>>> access> 
> > >>>>>>>>>  > the time> 
> > >>>>>>>>>  > in deployment and testing environments. The major driver is> 
> > >>>>>>>>>  > System.currentTimeMillis(),> 
> > >>>>>>>>>  > which a) cannot be used in tests b) could go in specific cases> 
> > >>>>> back> 
> > >>>>>> c)> 
> > >>>>>>>>>  > is not compatible> 
> > >>>>>>>>>  > with punctuator call. The idea is that we could access clock using> 
> > >>>>>>>>>  > uniform API.> 
> > >>>>>>>>>  > For completness we should have same API for system and stream> 
> > >>>>> time.> 
> > >>>>>>>>>  >> 
> > >>>>>>>>>  > 3. There aren't that many subclasses. Two important ones are> 
> > >>>>>>>>>  > ProcessorContextImpl and> 
> > >>>>>>>>>  > MockProcessorContext (and third one:> 
> > >>>>>>>>>  > ForwardingDisableProcessorContext). If given> 
> > >>>>>>>>>  > implementation does not support schedule() call, there is no> 
> > >>>>> reason> 
> > >>>>>> to> 
> > >>>>>>>>>  > support clock access.> 
> > >>>>>>>>>  > The default implementation should just throw> 
> > >>>>>>>>>  > UnsupportedOperationException just to prevent> 
> > >>>>>>>>>  > from compilation errors in possible subclasses.> 
> > >>>>>>>>>  >> 
> > >>>>>>>>>  > On 2020/07/01 02:24:43, Boyang Chen <re...@gmail.com>> 
> > >>>>>> wrote:> 
> > >>>>>>>>>  > > Thanks Will for the KIP. A couple questions and suggestions:> 
> > >>>>>>>>>  > >> 
> > >>>>>>>>>  > > 1. I think for new APIs to make most sense, we should add a> 
> > >>>>>> minimal example> 
> > >>>>>>>>>  > > demonstrating how it could be useful to structure unit tests w/o> 
> > >>>>>> the new> 
> > >>>>>>>>>  > > APIs.> 
> > >>>>>>>>>  > > 2. If this is a testing-only feature, could we only add it> 
> > >>>>>>>>>  > > to MockProcessorContext?> 
> > >>>>>>>>>  > > 3. Regarding the API, since this will be added to the> 
> > >>>>>> ProcessorContext with> 
> > >>>>>>>>>  > > many subclasses, does it make sense to provide default> 
> > >>>>>> implementations as> 
> > >>>>>>>>>  > > well?> 
> > >>>>>>>>>  > >> 
> > >>>>>>>>>  > > Boyang> 
> > >>>>>>>>>  > >> 
> > >>>>>>>>>  > > On Tue, Jun 30, 2020 at 6:56 PM William Bottrell <> 
> > >>>>>> bottrellw@gmail.com>> 
> > >>>>>>>>>  > > wrote:> 
> > >>>>>>>>>  > >> 
> > >>>>>>>>>  > > > Thanks, John! I made the change. How much longer should I let> 
> > >>>>>> there be> 
> > >>>>>>>>>  > > > discussion before starting a VOTE?> 
> > >>>>>>>>>  > > >> 
> > >>>>>>>>>  > > > On Sat, Jun 27, 2020 at 6:50 AM John Roesler <> 
> > >>>>>> vvcephei@apache.org> wrote:> 
> > >>>>>>>>>  > > >> 
> > >>>>>>>>>  > > > > Thanks, Will,> 
> > >>>>>>>>>  > > > >> 
> > >>>>>>>>>  > > > > That looks good to me. I would only add "cached" or> 
> > >>>>> something> 
> > >>>>>>>>>  > > > > to indicate that it wouldn't just transparently look up the> 
> > >>>>>> current> 
> > >>>>>>>>>  > > > > System.currentTimeMillis every time.> 
> > >>>>>>>>>  > > > >> 
> > >>>>>>>>>  > > > > For example:> 
> > >>>>>>>>>  > > > > /**> 
> > >>>>>>>>>  > > > > * Returns current cached wall-clock system timestamp in> 
> > >>>>>> milliseconds.> 
> > >>>>>>>>>  > > > > *> 
> > >>>>>>>>>  > > > > * @return the current cached wall-clock system timestamp in> 
> > >>>>>> milliseconds> 
> > >>>>>>>>>  > > > > */> 
> > >>>>>>>>>  > > > > long currentSystemTimeMs();> 
> > >>>>>>>>>  > > > >> 
> > >>>>>>>>>  > > > > I don't want to give specific information about _when_> 
> > >>>>>> exactly the> 
> > >>>>>>>>>  > > > > timestamp cache will be updated, so that we can adjust it in> 
> > >>>>>> the> 
> > >>>>>>>>>  > > > > future, but it does seem important to make people aware that> 
> > >>>>>> they> 
> > >>>>>>>>>  > > > > won't see the timestamp advance during the execution of> 
> > >>>>>>>>>  > > > > Processor.process(), for example.> 
> > >>>>>>>>>  > > > >> 
> > >>>>>>>>>  > > > > With that modification, I'll be +1 on this proposal.> 
> > >>>>>>>>>  > > > >> 
> > >>>>>>>>>  > > > > Thanks again for the KIP!> 
> > >>>>>>>>>  > > > > -John> 
> > >>>>>>>>>  > > > >> 
> > >>>>>>>>>  > > > > On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:> 
> > >>>>>>>>>  > > > > > Thanks, John! I appreciate you adjusting my lingo. I made> 
> > >>>>>> the change to> 
> > >>>>>>>>>  > > > > the> 
> > >>>>>>>>>  > > > > > KIP. I will add the note about system time to the javadoc.> 
> > >>>>>>>>>  > > > > >> 
> > >>>>>>>>>  > > > > > On Wed, Jun 24, 2020 at 6:52 PM John Roesler <> 
> > >>>>>> vvcephei@apache.org>> 
> > >>>>>>>>>  > > > > wrote:> 
> > >>>>>>>>>  > > > > >> 
> > >>>>>>>>>  > > > > > > Hi Will,> 
> > >>>>>>>>>  > > > > > >> 
> > >>>>>>>>>  > > > > > > This proposal looks good to me overall. Thanks for the> 
> > >>>>>> contribution!> 
> > >>>>>>>>>  > > > > > >> 
> > >>>>>>>>>  > > > > > > Just a couple of minor notes:> 
> > >>>>>>>>>  > > > > > >> 
> > >>>>>>>>>  > > > > > > The system time method would return a cached timestamp> 
> > >>>>>> that Streams> 
> > >>>>>>>>>  > > > > looks> 
> > >>>>>>>>>  > > > > > > up once when it starts processing a record. This may be> 
> > >>>>>> confusing, so> 
> > >>>>>>>>>  > > > > it> 
> > >>>>>>>>>  > > > > > > might be good to state it in the javadoc.> 
> > >>>>>>>>>  > > > > > >> 
> > >>>>>>>>>  > > > > > > I thought the javadoc for the stream time might be a bit> 
> > >>>>>> confusing.> 
> > >>>>>>>>>  > > > We> 
> > >>>>>>>>>  > > > > > > normally talk about “Tasks” not “partition groups” in> 
> > >>>>> the> 
> > >>>>>> public api.> 
> > >>>>>>>>>  > > > > Maybe> 
> > >>>>>>>>>  > > > > > > just saying that it’s “the maximum timestamp of any> 
> > >>>>>> record yet> 
> > >>>>>>>>>  > > > > processed by> 
> > >>>>>>>>>  > > > > > > the task” would be both high level an
[message truncated...]

Re: [DISCUSS] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

Posted by "Matthias J. Sax" <mj...@apache.org>.
1. We don't need to worry about impl detail. But yes, we can remove the
method from the interanl context that extends `ProcessorContext` already

2. Same here: we can discuss on the PR.


Btw: it seems you got enough votes. Can you close the vote? Looking
forward to your PR.


-Matthias

On 11/27/20 9:51 PM, Rohit Deshpande wrote:
> Hi,
> I would like to revive this KIP.
> 1. As per proposed solution, we want to add following method in ProcessorContext class
> /**
>  * Returns current cached wall-clock system timestamp in milliseconds.
>  *
>  * @return the current cached wall-clock system timestamp in milliseconds
>  */
> long currentSystemTimeMs();
> but InternalProcessorContext class already contains same method: https://github.com/guozhangwang/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java#L54
> Will it make more sense to get rid of this method from InternalProcessorContext and add it to ProcessorContext?
> 2. I am thinking of adding one test in TopologyDriverTest where using currentSystemTimeMs(), Processor will determine what to do with incoming record by comparing its timestamp with wall clock time. Similarly we can have
> another test where we fetch streamTime and can take an action on incoming record.
> 
> 
> Thanks,
> Rohit
> 
> 
> On 2020/08/14 05:07:04, "John Roesler" <v....@apache.org> wrote: 
>> Thanks for the reply, Matthias,> 
>>
>> I see what you mean. I suppose I was thinking that we would pass in the cached system time, which is also what we’re proposing to add to the ProcessorContext.> 
>>
>> If you think there’s something about the timestamp extractor in particular that would make people want more precision, then something like Time would do the trick. Since it’s not a public API, maybe just ‘Supplier<Long>’ would be appropriate.> 
>>
>> But I also don’t want to bikeshed it. My only concern was that it’s awkward to ask people to actually change their application code for testing. But maybe in this case, an option is better than no option, and if people don’t like it, we can always deprecate the mock extractor and evolve the interface later. > 
>>
>> So, I’m +1 either way.> 
>>
>> Thanks,> 
>> John> 
>>
>> On Mon, Aug 3, 2020, at 16:28, Matthias J. Sax wrote:> 
>>> Interesting proposal.> 
>>>>
>>> However, it raises the question how the runtime would pass in the> 
>>> `systemTime` parameter? To be accurate, we would need to call> 
>>> `Time.milliseconds()` each time before we call the timestamp extractor.> 
>>> This sound expensive and maybe the extractor does not even use this value.> 
>>>>
>>> Or we only call `Time.milliseconds()` periodically (as we also do in our> 
>>> runtime code) to make it cheap, however, we loose precision? Not sure if> 
>>> we can make this trade-off for the user?> 
>>>>
>>> Handing in the `Time` object itself might be another idea, however it> 
>>> seems "dangerous" though, as it does not seem to be actually public API?> 
>>>>
>>> Last, do we really think we need this feature? We never had a feature> 
>>> request for it and I am not aware of any issue with the current> 
>>> TimestampExtractor interface.> 
>>>>
>>> It's always easier to add it later if there is real demand instead of> 
>>> pro-actively changing it (and maybe the need to deprecate and remove> 
>>> later) with no clear benefit? Adding the `MockTimestampsExtractor` as> 
>>> part of the test-utils package seems less "dangerous" and should do the> 
>>> job, allowing us to collect feedback. If it's not good enough, we can> 
>>> still change the TimestampExtractor interface as a follow up?> 
>>>>
>>>>
>>> -Matthias> 
>>>>
>>> On 7/28/20 10:03 AM, John Roesler wrote:> 
>>>> Thanks Matthias,> 
>>>>>
>>>> This is a really good point. It might be a bummer> 
>>>> to have to actually change the topology between> 
>>>> testing and production. Do you think we can rather> 
>>>> evolve the TimestampExtractor interface to let> 
>>>> Streams pass in the current system time, along with> 
>>>> the current record and the current partition time?> 
>>>>>
>>>> For example, we could add a new method:> 
>>>> long extract(> 
>>>>   ConsumerRecord<Object, Object> record, > 
>>>>   long partitionTime,> 
>>>>   long systemTime> 
>>>> );> 
>>>>>
>>>> Then, Streams could pass in the current system > 
>>>> time and TopologyTestDriver could pass the mocked> 
>>>> time. Additionally, users who implement> 
>>>> TimestampExtractor on their own would be able to> 
>>>> deterministically unit-test their own implementation.> 
>>>>>
>>>> It's always a challenge to add to an interface without> 
>>>> breaking compatibility. In this case, it seems like> 
>>>> we could provide a default implementation that just> 
>>>> ignores the systemTime argument and calls> 
>>>> extract(record,  partitionTime) and also deprecate> 
>>>> the existing method. Then custom implementations> 
>>>> would get a deprecation warning telling them to> 
>>>> implement the other method, and when we remove> 
>>>> the deprecated extract(record, partitionTime), we can> 
>>>> also drop the default implementation from the new> 
>>>> method.> 
>>>>>
>>>> Specifically, what do you think about:> 
>>>> =================================> 
>>>> public interface TimestampExtractor {> 
>>>>     /*...> 
>>>>      * @deprecated Since 2.7 Implement> 
>>>>      *   {@code extract(ConsumerRecord<Object, Object> record, long partitionTime, long systemTime)} instead> 
>>>>      */> 
>>>>     @Deprecated> 
>>>>     long extract(> 
>>>>       ConsumerRecord<Object, Object> record,> 
>>>>       long partitionTime> 
>>>>     );> 
>>>>>
>>>>     default long extract(> 
>>>>       ConsumerRecord<Object, Object> record,> 
>>>>       long partitionTime,> 
>>>>       long systemTime) {> 
>>>>         return extract(record, partitionTime);> 
>>>>     }> 
>>>> }> 
>>>> =================================> 
>>>>>
>>>> Thanks,> 
>>>> -John> 
>>>>>
>>>> On Sun, Jul 26, 2020, at 15:47, Matthias J. Sax wrote:> 
>>>>> Hi,> 
>>>>>>
>>>>> I just had one more thought about an additional improvement we might> 
>>>>> want to include in this KIP.> 
>>>>>>
>>>>> Kafka Streams ships with a `WallclockTimestampExtractor` that just> 
>>>>> returns `System.currentTimeMillis()` and thus, cannot be mocked. And it> 
>>>>> seems that there is no way for a user to build a custom timestamps> 
>>>>> extractor that returns the TTD's mocked system time.> 
>>>>>>
>>>>> Thus, it might be nice, to add a `MockTimeExtractor` (only in the> 
>>>>> test-util package) that users could set and this `MockTimeExtractor`> 
>>>>> returns the mocked system time.> 
>>>>>>
>>>>> Thoughts?> 
>>>>>>
>>>>>>
>>>>> -Matthias> 
>>>>>>
>>>>> On 7/7/20 11:11 PM, Matthias J. Sax wrote:> 
>>>>>> I think, we don't need a default implementation for the new methods.> 
>>>>>>>
>>>>>> What would be the use-case to implement the  `ProcessorContext`> 
>>>>>> interface? In contract to, for example, `KeyValueStore`,> 
>>>>>> `ProcessorContext` is a use-only interface because it's never passed> 
>>>>>> into Kafka Streams, but only handed out to the user.> 
>>>>>>>
>>>>>>>
>>>>>> -Matthias> 
>>>>>>>
>>>>>>>
>>>>>> On 7/7/20 1:28 PM, William Bottrell wrote:> 
>>>>>>> Sure, I would appreciate help from Piotr creating an example.> 
>>>>>>>>
>>>>>>> On Tue, Jul 7, 2020 at 12:03 PM Boyang Chen <re...@gmail.com>> 
>>>>>>> wrote:> 
>>>>>>>>
>>>>>>>> Hey John,> 
>>>>>>>>>
>>>>>>>> since ProcessorContext is a public API, I couldn't be sure that people> 
>>>>>>>> won't try to extend it. Without a default implementation, user code> 
>>>>>>>> compilation will break.> 
>>>>>>>>>
>>>>>>>> William and Piotr, it seems that we haven't added any example usage of the> 
>>>>>>>> new API, could we try to address that? It should help with the motivation> 
>>>>>>>> and follow-up meta comments as John proposed.> 
>>>>>>>>>
>>>>>>>> Boyang> 
>>>>>>>>>
>>>>>>>> On Mon, Jul 6, 2020 at 12:04 PM Matthias J. Sax <mj...@apache.org> wrote:> 
>>>>>>>>>
>>>>>>>>> William,> 
>>>>>>>>>>
>>>>>>>>> thanks for the KIP. LGMT. Feel free to start a vote.> 
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> -Matthias> 
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> On 7/4/20 10:14 AM, John Roesler wrote:> 
>>>>>>>>>> Hi Richard,> 
>>>>>>>>>>>
>>>>>>>>>> It’s good to hear from you!> 
>>>>>>>>>>>
>>>>>>>>>> Thanks for bringing up the wall-clock suppression feature. IIRC,> 
>>>>>>>> someone> 
>>>>>>>>> actually started a KIP discussion for it already, but I don’t think it> 
>>>>>>>> went> 
>>>>>>>>> to a vote. I don’t recall any technical impediment, just the lack of> 
>>>>>>>>> availability to finish it up. Although there is some association, it> 
>>>>>>>> would> 
>>>>>>>>> be good to keep the KIPs separate.> 
>>>>>>>>>>>
>>>>>>>>>> Thanks,> 
>>>>>>>>>> John> 
>>>>>>>>>>>
>>>>>>>>>> On Sat, Jul 4, 2020, at 10:05, Richard Yu wrote:> 
>>>>>>>>>>> Hi all,> 
>>>>>>>>>>>>
>>>>>>>>>>> This reminds me of a previous issue I think that we were discussing.> 
>>>>>>>>>>> @John Roesler <ma...@apache.org> I think you should> 
>>>>>>>> remember> 
>>>>>>>>> this one.> 
>>>>>>>>>>>>
>>>>>>>>>>> A while back, we were talking about having suppress operator emit> 
>>>>>>>>>>> records by wall-clock time instead of stream time.> 
>>>>>>>>>>> If we are adding this, wouldn't that make it more feasible for us to> 
>>>>>>>>>>> implement that feature for suppression?> 
>>>>>>>>>>>>
>>>>>>>>>>> If I recall correctly, there actually had been quite a bit of user> 
>>>>>>>>>>> demand for such a feature.> 
>>>>>>>>>>> Might be good to include it in this KIP (or maybe use one of the prior> 
>>>>>>>>>>> KIPs for this feature).> 
>>>>>>>>>>>>
>>>>>>>>>>> Best,> 
>>>>>>>>>>> Richard> 
>>>>>>>>>>>>
>>>>>>>>>>> On Sat, Jul 4, 2020 at 6:58 AM John Roesler <vv...@apache.org>> 
>>>>>>>>> wrote:> 
>>>>>>>>>>>> Hi all,> 
>>>>>>>>>>>>>
>>>>>>>>>>>>  1. Thanks, Boyang, it is nice to see usage examples in KIPs like> 
>>>>>>>>> this. It helps during the discussion, and it’s also good documentation> 
>>>>>>>>> later on.> 
>>>>>>>>>>>>>
>>>>>>>>>>>>  2. Yeah, this is a subtle point. The motivation mentions being able> 
>>>>>>>>> to control the time during tests, but to be able to make it work, the> 
>>>>>>>>> processor implementation needs a public method on ProcessorContext to get> 
>>>>>>>>> the time. Otherwise, processors would have to check the type of the> 
>>>>>>>> context> 
>>>>>>>>> and cast, depending on whether they’re running inside a test or not. In> 
>>>>>>>>> retrospect, if we’d had a usage example, this probably would have been> 
>>>>>>>>> clear.> 
>>>>>>>>>>>>>
>>>>>>>>>>>>  3. I don’t think we expect people to have their own implementations> 
>>>>>>>>> of ProcessorContext. Since all implementations are internal, it’s really> 
>>>>>>>> an> 
>>>>>>>>> implementation detail whether we use a default method, abstract methods,> 
>>>>>>>> or> 
>>>>>>>>> concrete methods. I can’t think of an implementation that really wants to> 
>>>>>>>>> just look up the system time. In the production code path, we cache the> 
>>>>>>>>> time for performance, and in testing, we use a mock time.> 
>>>>>>>>>>>>>
>>>>>>>>>>>>  Thanks,> 
>>>>>>>>>>>>  John> 
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>  On Fri, Jul 3, 2020, at 06:41, Piotr Smoliński wrote:> 
>>>>>>>>>>>>  > 1. Makes sense; let me propose something> 
>>>>>>>>>>>>  >> 
>>>>>>>>>>>>  > 2. That's not testing-only. The goal is to use the same API to> 
>>>>>>>>> access> 
>>>>>>>>>>>>  > the time> 
>>>>>>>>>>>>  > in deployment and testing environments. The major driver is> 
>>>>>>>>>>>>  > System.currentTimeMillis(),> 
>>>>>>>>>>>>  > which a) cannot be used in tests b) could go in specific cases> 
>>>>>>>> back> 
>>>>>>>>> c)> 
>>>>>>>>>>>>  > is not compatible> 
>>>>>>>>>>>>  > with punctuator call. The idea is that we could access clock using> 
>>>>>>>>>>>>  > uniform API.> 
>>>>>>>>>>>>  > For completness we should have same API for system and stream> 
>>>>>>>> time.> 
>>>>>>>>>>>>  >> 
>>>>>>>>>>>>  > 3. There aren't that many subclasses. Two important ones are> 
>>>>>>>>>>>>  > ProcessorContextImpl and> 
>>>>>>>>>>>>  > MockProcessorContext (and third one:> 
>>>>>>>>>>>>  > ForwardingDisableProcessorContext). If given> 
>>>>>>>>>>>>  > implementation does not support schedule() call, there is no> 
>>>>>>>> reason> 
>>>>>>>>> to> 
>>>>>>>>>>>>  > support clock access.> 
>>>>>>>>>>>>  > The default implementation should just throw> 
>>>>>>>>>>>>  > UnsupportedOperationException just to prevent> 
>>>>>>>>>>>>  > from compilation errors in possible subclasses.> 
>>>>>>>>>>>>  >> 
>>>>>>>>>>>>  > On 2020/07/01 02:24:43, Boyang Chen <re...@gmail.com>> 
>>>>>>>>> wrote:> 
>>>>>>>>>>>>  > > Thanks Will for the KIP. A couple questions and suggestions:> 
>>>>>>>>>>>>  > >> 
>>>>>>>>>>>>  > > 1. I think for new APIs to make most sense, we should add a> 
>>>>>>>>> minimal example> 
>>>>>>>>>>>>  > > demonstrating how it could be useful to structure unit tests w/o> 
>>>>>>>>> the new> 
>>>>>>>>>>>>  > > APIs.> 
>>>>>>>>>>>>  > > 2. If this is a testing-only feature, could we only add it> 
>>>>>>>>>>>>  > > to MockProcessorContext?> 
>>>>>>>>>>>>  > > 3. Regarding the API, since this will be added to the> 
>>>>>>>>> ProcessorContext with> 
>>>>>>>>>>>>  > > many subclasses, does it make sense to provide default> 
>>>>>>>>> implementations as> 
>>>>>>>>>>>>  > > well?> 
>>>>>>>>>>>>  > >> 
>>>>>>>>>>>>  > > Boyang> 
>>>>>>>>>>>>  > >> 
>>>>>>>>>>>>  > > On Tue, Jun 30, 2020 at 6:56 PM William Bottrell <> 
>>>>>>>>> bottrellw@gmail.com>> 
>>>>>>>>>>>>  > > wrote:> 
>>>>>>>>>>>>  > >> 
>>>>>>>>>>>>  > > > Thanks, John! I made the change. How much longer should I let> 
>>>>>>>>> there be> 
>>>>>>>>>>>>  > > > discussion before starting a VOTE?> 
>>>>>>>>>>>>  > > >> 
>>>>>>>>>>>>  > > > On Sat, Jun 27, 2020 at 6:50 AM John Roesler <> 
>>>>>>>>> vvcephei@apache.org> wrote:> 
>>>>>>>>>>>>  > > >> 
>>>>>>>>>>>>  > > > > Thanks, Will,> 
>>>>>>>>>>>>  > > > >> 
>>>>>>>>>>>>  > > > > That looks good to me. I would only add "cached" or> 
>>>>>>>> something> 
>>>>>>>>>>>>  > > > > to indicate that it wouldn't just transparently look up the> 
>>>>>>>>> current> 
>>>>>>>>>>>>  > > > > System.currentTimeMillis every time.> 
>>>>>>>>>>>>  > > > >> 
>>>>>>>>>>>>  > > > > For example:> 
>>>>>>>>>>>>  > > > > /**> 
>>>>>>>>>>>>  > > > > * Returns current cached wall-clock system timestamp in> 
>>>>>>>>> milliseconds.> 
>>>>>>>>>>>>  > > > > *> 
>>>>>>>>>>>>  > > > > * @return the current cached wall-clock system timestamp in> 
>>>>>>>>> milliseconds> 
>>>>>>>>>>>>  > > > > */> 
>>>>>>>>>>>>  > > > > long currentSystemTimeMs();> 
>>>>>>>>>>>>  > > > >> 
>>>>>>>>>>>>  > > > > I don't want to give specific information about _when_> 
>>>>>>>>> exactly the> 
>>>>>>>>>>>>  > > > > timestamp cache will be updated, so that we can adjust it in> 
>>>>>>>>> the> 
>>>>>>>>>>>>  > > > > future, but it does seem important to make people aware that> 
>>>>>>>>> they> 
>>>>>>>>>>>>  > > > > won't see the timestamp advance during the execution of> 
>>>>>>>>>>>>  > > > > Processor.process(), for example.> 
>>>>>>>>>>>>  > > > >> 
>>>>>>>>>>>>  > > > > With that modification, I'll be +1 on this proposal.> 
>>>>>>>>>>>>  > > > >> 
>>>>>>>>>>>>  > > > > Thanks again for the KIP!> 
>>>>>>>>>>>>  > > > > -John> 
>>>>>>>>>>>>  > > > >> 
>>>>>>>>>>>>  > > > > On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:> 
>>>>>>>>>>>>  > > > > > Thanks, John! I appreciate you adjusting my lingo. I made> 
>>>>>>>>> the change to> 
>>>>>>>>>>>>  > > > > the> 
>>>>>>>>>>>>  > > > > > KIP. I will add the note about system time to the javadoc.> 
>>>>>>>>>>>>  > > > > >> 
>>>>>>>>>>>>  > > > > > On Wed, Jun 24, 2020 at 6:52 PM John Roesler <> 
>>>>>>>>> vvcephei@apache.org>> 
>>>>>>>>>>>>  > > > > wrote:> 
>>>>>>>>>>>>  > > > > >> 
>>>>>>>>>>>>  > > > > > > Hi Will,> 
>>>>>>>>>>>>  > > > > > >> 
>>>>>>>>>>>>  > > > > > > This proposal looks good to me overall. Thanks for the> 
>>>>>>>>> contribution!> 
>>>>>>>>>>>>  > > > > > >> 
>>>>>>>>>>>>  > > > > > > Just a couple of minor notes:> 
>>>>>>>>>>>>  > > > > > >> 
>>>>>>>>>>>>  > > > > > > The system time method would return a cached timestamp> 
>>>>>>>>> that Streams> 
>>>>>>>>>>>>  > > > > looks> 
>>>>>>>>>>>>  > > > > > > up once when it starts processing a record. This may be> 
>>>>>>>>> confusing, so> 
>>>>>>>>>>>>  > > > > it> 
>>>>>>>>>>>>  > > > > > > might be good to state it in the javadoc.> 
>>>>>>>>>>>>  > > > > > >> 
>>>>>>>>>>>>  > > > > > > I thought the javadoc for the stream time might be a bit> 
>>>>>>>>> confusing.> 
>>>>>>>>>>>>  > > > We> 
>>>>>>>>>>>>  > > > > > > normally talk about “Tasks” not “partition groups” in> 
>>>>>>>> the> 
>>>>>>>>> public api.> 
>>>>>>>>>>>>  > > > > Maybe> 
>>>>>>>>>>>>  > > > > > > just saying that it’s “the maximum timestamp of any> 
>>>>>>>>> record yet> 
>>>>>>>>>>>>  > > > > processed by> 
>>>>>>>>>>>>  > > > > > > the task” would be both high level an
> [message truncated...]
>