You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax (JIRA)" <ji...@apache.org> on 2017/05/01 07:30:04 UTC

[jira] [Comment Edited] (KAFKA-5144) MinTimestampTracker does not correctly add timestamps lower than the current max

    [ https://issues.apache.org/jira/browse/KAFKA-5144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15990619#comment-15990619 ] 

Matthias J. Sax edited comment on KAFKA-5144 at 5/1/17 7:29 AM:
----------------------------------------------------------------

It is intended behavior. {{MinTimestampTracker}} tracks the minimum timestamp for non-processed records in the buffer. The undocumented usage pattern is, that `addElement()` and `removeElement()` are (must be) called in the same order. Your tests don't follow this pattern. The logic is about tracking the current partition time as minimum of whatever is in the buffer (also considering out-of-order records) (cf. KAFKA-3514)

Example: We get a batch of records with ts {{5, 10, 15, 12, 20}} and add them consecutively to the timestamp tracker. Thus, when we process those record one by one, is the following steps:

 - process {{5}}, queue {{10, 15, 12, 20}}, partition time {{5}}
 - process {{10}}, queue {{15, 12, 20}}, partition time {{10}}
 - process {{15}}, queue {{12, 20}}, partition time {{12}} (!! current minimum is {{12}}, not {{15}} !!)
 - process {{12}}, queue {{20}}, partition time {{12}}
 - process {{20}}, queue empty, partition time {{20}}

The tracker will have the following states when adding the records one by one (this happens before processing begins):
 - add {{5}}: {{5}}
 - add {{10}}: {{5, 10}}
 - add {{15}}: {{5, 10, 15}}
 - add {{12}}: {{5, 10, 12}} (!! this the the behavior that is not a bug !!)
 - add {{20}}: {{5, 10, 12, 20}}

During processing, we {{poll}} the head record from the queue, call {{removeElement}} on the tracker afterwards. Thus, we get (not we start with tracker state {{5, 10, 12, 20}}):
 - poll from queue {{5}}, tracker after remove {{10, 12, 20}}
 - poll from queue {{10}}, tracker after remove {{12, 20}}
 - poll from queue {{15}}, tracker after remove {{12, 20}} (!! as we call {{removeElement(15)}} we keep {{12}}, this allows us to use {{12}} for two records)
 - poll from queue {{12}}, tracker after remove {{20}}
 - poll from queue {{20}}, tracker after remove empty

Note that the call to {{timeTracker.get()}} ({{RecordQueue}} L124) happens after {{timeTracker.removeElement(elem)}} ({{RecordQueue}} L120). This is also correct, as we advance "partition time" in this step, but only use it _after_ the current record got fully processed.

 - process {{5}}, queue {{10, 15, 12, 20}}, tracker after remove {{10, 12, 20}}, next partition time {{10}}
 - process {{10}}, queue {{15, 12, 20}}, tracker after remove {{12, 20}}, next partition time {{12}}
 - process {{15}}, queue {{12, 20}}, tracker after remove {{12, 20}}, next partition time {{12}}
 - process {{12}}, queue {{20}}, tracker after remove {{20}},  next partition time {{20}}
 - process {{20}}, queue empty, tracker after remove empty, next partition time {{20}} (from {{lastKnownTime}})



was (Author: mjsax):
It is intended behavior. {{MinTimestampTracker}} tracks the minimum timestamp for non-processed records in the buffer. The undocumented usage pattern is, that `addElement()` and `removeElement()` are (must be) called in the same order. Your tests don't follow this pattern. The logic is about tracking the current partition time as minimum of whatever is in the buffer (also considering out-of-order records) (cf. KAFKA-3514)

Example: We get a batch of records with ts {{5, 10, 15, 12, 20}} and add them consecutively to the timestamp tracker. Thus, when we process those record one by one, is the following steps (note, I am not sure if "partition time" in this example is correct; cf. below):

 - process {{5}}, queue {{10, 15, 12, 20}}, partition time {{5}}
 - process {{10}}, queue {{15, 12, 20}}, partition time {{10}}
 - process {{15}}, queue {{12, 20}}, partition time {{12}} (!! current minimum is {{12}}, not {{15}} !!)
 - process {{12}}, queue {{20}}, partition time {{12}}
 - process {{20}}, queue empty, partition time {{20}}

The tracker will have the following states when adding the records one by one (this happens before processing begins):
 - add {{5}}: {{5}}
 - add {{10}}: {{5, 10}}
 - add {{15}}: {{5, 10, 15}}
 - add {{12}}: {{5, 10, 12}} (!! this the the behavior that is not a bug !!)
 - add {{20}}: {{5, 10, 12, 20}}

During processing, we {{poll}} the head record from the queue, call {{removeElement}} on the tracker afterwards. Thus, we get (not we start with tracker state {{5, 10, 12, 20}}):
 - poll from queue {{5}}, tracker after remove {{10, 12, 20}}
 - poll from queue {{10}}, tracker after remove {{12, 20}}
 - poll from queue {{15}}, tracker after remove {{12, 20}} (!! as we call {{removeElement(15)}} we keep {{12}}, this allows us to use {{12}} for two records)
 - poll from queue {{12}}, tracker after remove {{20}}
 - poll from queue {{20}}, tracker after remove empty

However, I am not sure if we actually advance "partition time" as indented -- we might want to call {{timeTracker.get()}} ({{RecordQueue}} L124) before {{timeTracker.removeElement(elem)}} ({{RecordQueue}} L120) -- it seem the "partition time" in my example above is not what Streams computes atm (even if the example seems as it would follow what is the intended "partition time"): if I am not wrong, Streams would give:

 - process {{5}}, queue {{10, 15, 12, 20}}, tracker after remove {{10, 12, 20}}, partition time {{10}}
 - process {{10}}, queue {{15, 12, 20}}, tracker after remove {{12, 20}}, partition time {{12}}
 - process {{15}}, queue {{12, 20}}, tracker after remove {{12, 20}}, partition time {{12}}
 - process {{12}}, queue {{20}}, tracker after remove {{20}},  partition time {{20}}
 - process {{20}}, queue empty, tracker after remove empty, partition time {{20}} (from {{lastKnownTime}})


> MinTimestampTracker does not correctly add timestamps lower than the current max
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-5144
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5144
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.1
>            Reporter: Michal Borowiecki
>            Assignee: Michal Borowiecki
>
> When adding elements MinTimestampTracker removes all existing elements greater than the added element.
> Perhaps I've missed something and this is intended behaviour but I can't find any evidence for that in comments or tests.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)