You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "William Bottrell (Jira)" <ji...@apache.org> on 2020/06/05 07:16:00 UTC

[jira] [Commented] (KAFKA-10062) Add a method to retrieve the current timestamp as known by the Streams app

    [ https://issues.apache.org/jira/browse/KAFKA-10062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126467#comment-17126467 ] 

William Bottrell commented on KAFKA-10062:
------------------------------------------

I'm very new to Kafka, but I'd like to help with this issue. I'm working on getting permission to create a KIP.

I'd like to get a head start on the problem:

Adding #currentSystemTimeMs to the public API makes sense to me because it is already implemented. 

Could someone point me in the right direction for implementing #currentStreamTimeMs?

> Add a method to retrieve the current timestamp as known by the Streams app
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-10062
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10062
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Piotr Smolinski
>            Priority: Major
>              Labels: needs-kip, newbie
>
> Please add to the ProcessorContext a method to retrieve current timestamp compatible with Punctuator#punctate(long) method.
> Proposal in ProcessorContext:
> long getTimestamp(PunctuationType type);
> The method should return time value as known by the Punctuator scheduler with the respective PunctuationType.
> The use-case is tracking of a process with timeout-based escalation.
> A transformer receives process events and in case of missing an event execute an action (emit message) after given escalation timeout (several stages). The initial message may already arrive with reference timestamp in the past and may trigger different action upon arrival depending on how far in the past it is.
> If the timeout should be computed against some further time only, Punctuator is perfectly sufficient. The problem is that I have to evaluate the current time-related state once the message arrives.
> I am using wall-clock time. Normally accessing System.currentTimeMillis() is sufficient, but it breaks in unit testing with TopologyTestDriver, where the app wall clock time is different from the system-wide one.
> To access the mentioned clock I am using reflection to access ProcessorContextImpl#task and then StreamTask#time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)