You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "John Roesler (Jira)" <ji...@apache.org> on 2020/05/01 21:35:00 UTC

[jira] [Commented] (KAFKA-7224) KIP-328: Add spill-to-disk for Suppression

    [ https://issues.apache.org/jira/browse/KAFKA-7224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17097684#comment-17097684 ] 

John Roesler commented on KAFKA-7224:
-------------------------------------

Hi all,

Thanks for the good points all around.

Just to close the loop on _this_ ticket (disk-based suppression). It was _extremely_ poor performance. So much so that my thinking was that for anyone with high enough volume to actually need suppression, it would be too slow to be useful. The problem is that we need to check the beginning of the suppression buffer on every (or almost every) record, to see if we need to emit something. For an in-memory store, this is fine, but for RocksDB in particular, the scan performance is very slow. There are fundamental reasons why this is the case, which we don't need to get into here.

It might be possible to cleverly engineer our way around the problem, but anything I came up with just sounded too complicated to be worth it.

However, this is only necessary if you want the semantics of Suppress (each record times out individually, based on stream time). If you instead just want to buffer everything on disk and then emit everything you've buffered, say once an hour, you can do it much more efficiently in a custom FlatTransformValues where you put all incoming data into the store, then schedule a wall-clock punctuation to scan the entire store and forward everything.

The one complication is that the wall-clock punctuation currently blocks the StreamThread, so you need to have some sense of how long it will take (observed empirically) and make sure that you set the {{max.poll.interval.ms}} with enough head-room so you won't drop out of the group.

This is bleeding more into the domain of KIP-424, which does seem more like what [~maatdeamon] needs (just agreeing with the discussion so far). I don't think there was any technical impediment to implementing that one, it was just that the KIP discussion petered out (which happens sometimes). I guess, building on my last paragraphs, _if_ we had wall-clock-based suppression, _then_ it might make more sense to offer on-disk suppression in addition to in-memory, as at least the (wall-clock + on-disk) configuration could be performant. But it would need much more design. I'm still unsure if on-disk suppression is really a good idea to implement in the DSL.

A final thought worth mentioning in this discussion is that KIP-557 ( [https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams] ) will go a long way toward dropping unnecessary updates. This isn't the same thing as suppressing intermediate results, but it will help a great deal to at least drop idempotent updates early in the topology and not even have to suppress them at the end.

Thanks,

-John

> KIP-328: Add spill-to-disk for Suppression
> ------------------------------------------
>
>                 Key: KAFKA-7224
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7224
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Priority: Major
>
> As described in [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables]
> Following on KAFKA-7223, implement the spill-to-disk buffering strategy.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)