You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "John Roesler (JIRA)" <ji...@apache.org> on 2019/08/08 15:18:00 UTC

[jira] [Created] (KAFKA-8769) Consider computing stream time independently per key

John Roesler created KAFKA-8769:
-----------------------------------

             Summary: Consider computing stream time independently per key
                 Key: KAFKA-8769
                 URL: https://issues.apache.org/jira/browse/KAFKA-8769
             Project: Kafka
          Issue Type: Improvement
            Reporter: John Roesler


Currently, Streams uses a concept of "stream time", which is computed as the highest timestamp observed by stateful operators, per partition. This concept of time backs grace period, retention time, and suppression.

For use cases in which data is produced to topics in roughly chronological order (as in db change capture), this reckoning is fine.

Some use cases have a different pattern, though. For example, in IOT applications, it's common for sensors to save up quite a bit of data and then dump it all at once into the topic. See https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware for a concrete example of the use case.

I have heard of cases where each sensor dumps 24 hours' worth of data at a time into the topic. This results in a pattern in which, when reading a single partition, the operators observe a lot of consecutive records for one key that increase in timestamp for 24 hours, then a bunch of consecutive records for another key that are also increasing in timestamp over the same 24 hour period. With our current stream-time definition, this means that the partition's stream time increases while reading the first key's data, but then stays paused while reading the second key's data, since the second batch of records all have timestamps in the "past".

E.g:
{noformat}
A@t0 (stream time: 0)
A@t1 (stream time: 1)
A@t2 (stream time: 2)
A@t3 (stream time: 3)
B@t0 (stream time: 3)
B@t1 (stream time: 3)
B@t2 (stream time: 3)
B@t3 (stream time: 3)
{noformat}

This pattern results in an unfortunate compromise in which folks are required to set the grace period to the max expected time skew, for example 24 hours, or Streams will just drop the second key's data (since it is late). But, this means that if they want to use Suppression for "final results", they have to wait 24 hours for the result.

This tradeoff is not strictly necessary, though, because each key represents a logically independent sequence of events. Tracking by partition is simply convenient, but typically not logically meaningful. That is, the partitions are just physically independent sequences of events, so it's convenient to track stream time at this granularity. It would be just as correct, and more useful for IOT-like use cases, to track time independently for each key.

However, before considering this change, we need to solve the testing/low-traffic problem. This is the opposite issue, where a partition doesn't get enough traffic to advance stream time and results remain "stuck" in the suppression buffers. We can provide some mechanism to force the advancement of time across all partitions, for use in testing when you want to flush out all results, or in production when some topic is low volume. We shouldn't consider tracking time _more_ granularly until this problem is solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)