You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Igor Piddubnyi (Jira)" <ji...@apache.org> on 2020/07/23 12:47:00 UTC

[jira] [Commented] (KAFKA-8582) Consider adding an ExpiredWindowRecordHandler to Suppress

    [ https://issues.apache.org/jira/browse/KAFKA-8582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163529#comment-17163529 ] 

Igor Piddubnyi commented on KAFKA-8582:
---------------------------------------

Hi [~mjsax], as discussed in PR please assign the ticket to me.

> Consider adding an ExpiredWindowRecordHandler to Suppress
> ---------------------------------------------------------
>
>                 Key: KAFKA-8582
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8582
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Priority: Major
>
> I got some feedback on Suppress:
> {quote}Specifying how to handle events outside the grace period does seem like a business concern, and simply discarding them thus seems risky (for example imagine any situation where money is involved).
> This sort of situation is addressed by the late-triggering approach associated with watermarks (https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), given this I wondered if you were considering adding anything similar?{quote}
> It seems like, if a record has arrived past the grace period for its window, then the state of the windowed aggregation would already have been lost, so if we were to compute an aggregation result, it would be incorrect. Plus, since the window is already expired, we can't store the new (incorrect, but more importantly expired) aggregation result either, so any subsequent super-late records would also face the same blank-slate. I think this would wind up looking like this: if you have three timely records for a window, and then three more that arrive after the grace period, and you were doing a count aggregation, you'd see the counts emitted for the window as [1, 2, 3, 1, 1, 1]. I guess we could add a flag to the post-expiration results to indicate that they're broken, but this seems like the wrong approach. The post-expiration aggregation _results_ are meaningless, but I could see wanting to send the past-expiration _input records_ to a dead-letter queue or something instead of dropping them.
> Along this line of thinking, I wonder if we should add an optional past-expiration record handler interface to the suppression operator. Then, you could define your own logic, whether it's a dead-letter queue, sending it to some alerting pipeline, or even just crashing the application before it can do something wrong. This would be a similar pattern to how we allow custom logic to handle deserialization errors by supplying a org.apache.kafka.streams.errors.DeserializationExceptionHandler.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)