You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax (JIRA)" <ji...@apache.org> on 2019/02/24 19:35:00 UTC

[jira] [Created] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

Matthias J. Sax created KAFKA-7994:
--------------------------------------

             Summary: Improve Stream-Time for rebalances and restarts
                 Key: KAFKA-7994
                 URL: https://issues.apache.org/jira/browse/KAFKA-7994
             Project: Kafka
          Issue Type: Bug
          Components: streams
            Reporter: Matthias J. Sax


We compute a per-partition partition-time as the maximum timestamp over all records processed so far. Furthermore, we use partition-time to compute stream-time for each task as maximum over all partition-times (for all corresponding task partitions). This stream-time is used to make decisions about processing out-of-order records or drop them if they are late (ie, timestamp < stream-time - grace-period).

During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, -1) for tasks that are newly created (or migrated). In net effect, we forget current stream-time for this case what may lead to non-deterministic behavior if we stop processing right before a late record, that would be dropped if we continue processing, but is not dropped after rebalance/restart. Let's look at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and the following records (timestamps in parenthesis):

 
{code:java}
r1(0) r2(5) r3(11) r4(2){code}
In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or rebalance after processing `r3` but before processing `r4`, we would reinitialize stream-time as -1, and thus would process `r4` on restart/after rebalance. The problem is, that stream-time does advance differently from a global point of view: 0, 5, 11, 2.

 

Note, this is a corner case, because if we would stop processing one record earlier, ie, after processing `r2` but before processing `r3`, stream-time would be advance correctly from a global point of view.

A potential fix would be, to store latest observed partition-time in the metadata of committed offsets. Thus way, on restart/rebalance we can re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)