You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Richard Yu (Jira)" <ji...@apache.org> on 2019/10/20 21:26:00 UTC

[jira] [Issue Comment Deleted] (KAFKA-8769) Consider computing stream time independently per key

     [ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Richard Yu updated KAFKA-8769:
------------------------------
    Comment: was deleted

(was: [~vvcephei] Just a thought. Would the low traffic problem be made significantly worse by per key stream time tracking?

I think that it would only be marginally worse at best than the current situation we have now. What we could do to get around this problem is that the logic in the suppression buffers stays the same (as it did before so that the problem does not get worse). What this means is that, yes, each individual key will have a different stream time. But when we evict records, we evict them based on the maximum of stream times _of_ all keys. This means that the low traffic problem is not made worse, it remains the same.

I think this is acceptable. WDYT?)

> Consider computing stream time independently per key
> ----------------------------------------------------
>
>                 Key: KAFKA-8769
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8769
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Priority: Major
>              Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the highest timestamp observed by stateful operators, per partition. This concept of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT applications, it's common for sensors to save up quite a bit of data and then dump it all at once into the topic. See https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a time into the topic. This results in a pattern in which, when reading a single partition, the operators observe a lot of consecutive records for one key that increase in timestamp for 24 hours, then a bunch of consecutive records for another key that are also increasing in timestamp over the same 24 hour period. With our current stream-time definition, this means that the partition's stream time increases while reading the first key's data, but then stays paused while reading the second key's data, since the second batch of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required to set the grace period to the max expected time skew, for example 24 hours, or Streams will just drop the second key's data (since it is late). But, this means that if they want to use Suppression for "final results", they have to wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents a logically independent sequence of events. Tracking by partition is simply convenient, but typically not logically meaningful. That is, the partitions are just physically independent sequences of events, so it's convenient to track stream time at this granularity. It would be just as correct, and more useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the testing/low-traffic problem. This is the opposite issue, where a partition doesn't get enough traffic to advance stream time and results remain "stuck" in the suppression buffers. We can provide some mechanism to force the advancement of time across all partitions, for use in testing when you want to flush out all results, or in production when some topic is low volume. We shouldn't consider tracking time _more_ granularly until this problem is solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)