You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Matthias J. Sax" <mj...@apache.org> on 2021/04/07 21:21:32 UTC

Re: Kafka Streams - Out of Order Handling

Sorry for late reply...


> I only see issues of out of order data in my re-partitioned topic as a result of a rebalance happening.

If you re-partition, you may actually see out-of-order data even if
there is no rebalance. In the end, during repartitioning you have
multiple upstream writers for the repartition topic and thus interleaved
writes per partition.

Maybe it's not a issue during regular processing for your use case, as
your throughput seems to be tiny.


> I believe all stream threads across all app instances will pause consuming whilst the rebalance is worked through.

No really. (1) If a thread dies, it takes some time to detect that the
thread died and thus all other thread continue to process until a
rebalance is even started. (2) With incremental rebalancing, partitions
that are not re-assigned are processed throughout a rebalance.


> but am I right in thinking that one streams app (or at least some of its stream threads) will have to wait for state to be synced from the changelog topic?

That is correct. If a thread gets a new task assigned and needs to
recover state, it would pause processing for all its partitions/tasks
until restore finished. However, this pausing of partitions/tasks
happens only on a per-thread basis. Threads, even from the same app
instance, are totally agnostic to each other.

So what your describe in your example make sense.


> If this is the case, do you think increasing standby replicas will lessen the issue? 

Standbys can reduce your recovery time, and for your low throughput use
case may even reduce recovery time to zero. Thus, if the rebalance
itself happens quickly enough, the issue with out-of-order data may go
away (or at least should be large mitigated).


-Matthias


On 3/12/21 3:40 AM, Marcus Horsley-Rai wrote:
> Thanks Matthias - that's great to know.
> 
>> Increasing the grace period should not really affect throughput, but
>> latency.
> 
> Yes, a slip of the tongue on my part, you’re right :-)
> 
> One last question if I may? I only see issues of out of order data in my re-partitioned topic as a result of a rebalance happening.
> My hypothesis is that when an instance of my streams app dies - the consumption of data from the partitions it was responsible for falls behind compared to others.
> I believe all stream threads across all app instances will pause consuming whilst the rebalance is worked through.. but am I right in thinking that one streams app (or at least some of its stream threads) will have to wait for state to be synced from the changelog topic?
> In other words - when a rebalance happens - I assume the consumer group doesn’t wait for the slowest member to be ready to consume?
> 
> To illustrate with an example:
> 	If I have 3 partitions of a single topic and three streams app instances (1 partition each)
> 	I have a producer that produces to each partition each minute on the minute
> 	Normally the timestamp of the head record is roughly the same across all three partitions. This assumes no lag ever builds up on the consumer group, and also assumes data volume and size of messages is comparable.
> 
> 	Now I kill streams app A. The rebalance protocol kicks in and gives instance B an extra partition to consume from.
>         Could there now be a bigger lag for one or both of the partitions app B is consuming from because it had to sync state store state? (Assume B has enough stream processing threads idle and the machine is specced to cope with the extra load)
>        …whereas app C, unhindered by state syncing, has potentially now produced to the through topic a record from a newer batch/time window.
> 
> If this is the case, do you think increasing standby replicas will lessen the issue?  I obviously don’t expect it to be a magic bullet, and grace period is still required in general
> 
> 
> Best Regards,
> 
> Marcus
> 
> 
> 
> 
> On Thu, Mar 11, 2021 at 1:40 AM Matthias J. Sax <mjsax@apache.org <ma...@apache.org>> wrote:
>> will it consider a timestamp in the body of the message, if we have implemented a custom TimeExtractor?
> 
> Yes.
> 
> 
>> Or, which I feel is more likely - does TimeExtractor stream time only apply later on once deserialisation has happened?
> 
> Well, the extractor does apply after deserialization, but we deserialize
> each partition head-record to be able to apply the timestamp extractor:
> ie, deserialization happens when a record becomes the "head record".
> 
> Cf
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java <https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java>
> 
> 
>> the accuracy of the aggregates may have to come second to the throughput.
> 
> Increasing the grace period should not really affect throughput, but
> latency.
> 
> 
> 
> -Matthias
> 
> 
> On 3/10/21 3:37 PM, Marcus Horsley-Rai wrote:
>>
>> Thanks for your reply Matthias, and really great talks :-)
>>
>> You’re right that I only have one input topic - though it does have 20 partitions.
>> The pointer to max.task.idle.ms <http://max.task.idle.ms/> cleared something up for me; I read the following line from Kafka docs but couldn’t find what configuration they were referring to.
>>
>>>      Within a stream task that may be processing multiple topic-partitions, if users configure the application to not wait for all partitions to contain some buffered data and pick from the partition with the smallest timestamp to process the next record, then later on when some records are fetched for other topic-partitions, their timestamps may be smaller than those processed records fetched from another topic-partition.
>>>
>>       
>> When streams is checking the head record of each partition to pick the lowest timestamp - will it consider a timestamp in the body of the message, if we have implemented a custom TimeExtractor?
>> Or, which I feel is more likely - does TimeExtractor stream time only apply later on once deserialisation has happened?
>> The reason I ask is because our producer code doesn’t manually set the timestamp in ProducerRecord, only in the JSON body. That may be something we can look to change.
>>
>> As you say, I fear adjusting grace time may be my only solution; however because this is a real-time monitoring application…the accuracy of the aggregates may have to come second to the throughput.
>>
>> Many thanks,
>>
>> Marcus
>>
>>
>> On 2021/03/09 08:21:22, "Matthias J. Sax" <m...@apache.org <ma...@apache.org>> wrote: 
>>> In general, Kafka Streams tries to process messages in timestamp order,> 
>>> ie, oldest message first. However, Kafka Streams always need to process> 
>>> messages in offset order per partition, and thus, the timestamp> 
>>> synchronization applied to records from different topic (eg, if you join> 
>>> two topics).> 
>>>
>>> There is config `max.task.idle.ms <http://max.task.idle.ms/>` to improve timestamp synchronization,> 
>>> but I am not sure if it would help in your case, as it seems you have a> 
>>> single input topic.> 
>>>
>>> It seems, there is already out-of-order data in your input topic. Also> 
>>> note that your repartition step, may introduce out-or-order data.> 
>>>
>>> As you are using a custom Processor, it is up to you to handle> 
>>> out-of-order data, and it seems that you may need to introduce a larger> 
>>> grace period. In general, it's very hard (too impossible) to know how> 
>>> much unorder is in a topic, due the decoupled nature of Kafka and> 
>>> interleaved writes of different producers into a topic.> 
>>>
>>> Not sure if you could change the original partitioning to just use> 
>>> `location-id` to avoid the additional repartitioning step. This could> 
>>> help to reduce unorder.> 
>>>
>>> For completeness, check out those Kafka Summit talks:> 
>>>  -> 
>>> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/ <https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/>> 
>>>  -> 
>>> https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/ <https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/>> 
>>>
>>> Hope this helps.> 
>>>
>>> -Matthias> 
>>>
>>> On 3/3/21 7:03 AM, Marcus Horsley-Rai wrote:> 
>>>> Hi All,> 
>>>>>
>>>> Just to give a bit of context; I have an application which is SNMP polling> 
>>>> a network. Each collector agent works on a 1 minute schedule, polling> 
>>>> device(s) and posting results to a Kafka topic.> 
>>>> The time a given collector publishes data can vary within a minute, but it> 
>>>> should never overlap with the next minute time bucket.> 
>>>>>
>>>> The topic produced to, for arguments sake 'device-results' has multiple> 
>>>> partitions. The data is keyed such as 'device-id|location-id'.> 
>>>>>
>>>> I then had a requirement to aggregate the data by location; every device> 
>>>> result within the same location is summed, and an aggregate is output each> 
>>>> minute.> 
>>>> I'm aware the streams DSL has groupByKey/WindowedBy/Suppress which is a> 
>>>> solution to this problem - but we found the throughput was abysmal -> 
>>>> probably due to the I/O performance of our virtual machine infrastructure.> 
>>>>>
>>>> Instead we have hand-rolled something simplistic - which does the job 99%> 
>>>> well.> 
>>>>  - We use a through() to re-partition the topic to just location-id> 
>>>>  - We keep an object representing the current minute's aggregate in an> 
>>>> in-memory state store (with changelog)> 
>>>>  - When any device result is transformed, and has a timestamp that is older> 
>>>> than our current window time - we output the aggregate, otherwise update> 
>>>> the running sum.> 
>>>>>
>>>>  What I have noticed is that when I do a rolling restart of the> 
>>>> application, such as to push new code, data is dropped because of messages> 
>>>> processed out of order.> 
>>>> I changed the code to include the equivalent of an extra minute's grace> 
>>>> time, but in production I see messages arriving that are > 2min behind what> 
>>>> the latest messages are.> 
>>>>>
>>>> I came across the documentation> 
>>>> https://kafka.apache.org/23/documentation/streams/core-concepts#streams_out_of_ordering <https://kafka.apache.org/23/documentation/streams/core-concepts#streams_out_of_ordering>> 
>>>> which alluded to maybe a solution.> 
>>>> Could anyone advise if there is a way in code/configuration properties that> 
>>>> I could better guarantee that streams prioritises the *oldest* messages> 
>>>> first, rather than caring about offset?> 
>>>>>
>>>> Thanks in advance for any replies!> 
>>>>>
>>>> Marcus> 
>>>>>
> 

Re: Kafka Streams - Out of Order Handling

Posted by Marcus Horsley-Rai <ma...@gmail.com>.
Thanks very much for taking the time to answer, Matthias!  Very much
appreciated

All the best,

Marcus

On Wed, Apr 7, 2021 at 10:22 PM Matthias J. Sax <mj...@apache.org> wrote:

> Sorry for late reply...
>
>
> > I only see issues of out of order data in my re-partitioned topic as a
> result of a rebalance happening.
>
> If you re-partition, you may actually see out-of-order data even if
> there is no rebalance. In the end, during repartitioning you have
> multiple upstream writers for the repartition topic and thus interleaved
> writes per partition.
>
> Maybe it's not a issue during regular processing for your use case, as
> your throughput seems to be tiny.
>
>
> > I believe all stream threads across all app instances will pause
> consuming whilst the rebalance is worked through.
>
> No really. (1) If a thread dies, it takes some time to detect that the
> thread died and thus all other thread continue to process until a
> rebalance is even started. (2) With incremental rebalancing, partitions
> that are not re-assigned are processed throughout a rebalance.
>
>
> > but am I right in thinking that one streams app (or at least some of its
> stream threads) will have to wait for state to be synced from the changelog
> topic?
>
> That is correct. If a thread gets a new task assigned and needs to
> recover state, it would pause processing for all its partitions/tasks
> until restore finished. However, this pausing of partitions/tasks
> happens only on a per-thread basis. Threads, even from the same app
> instance, are totally agnostic to each other.
>
> So what your describe in your example make sense.
>
>
> > If this is the case, do you think increasing standby replicas will
> lessen the issue?
>
> Standbys can reduce your recovery time, and for your low throughput use
> case may even reduce recovery time to zero. Thus, if the rebalance
> itself happens quickly enough, the issue with out-of-order data may go
> away (or at least should be large mitigated).
>
>
> -Matthias
>
>
> On 3/12/21 3:40 AM, Marcus Horsley-Rai wrote:
> > Thanks Matthias - that's great to know.
> >
> >> Increasing the grace period should not really affect throughput, but
> >> latency.
> >
> > Yes, a slip of the tongue on my part, you’re right :-)
> >
> > One last question if I may? I only see issues of out of order data in my
> re-partitioned topic as a result of a rebalance happening.
> > My hypothesis is that when an instance of my streams app dies - the
> consumption of data from the partitions it was responsible for falls behind
> compared to others.
> > I believe all stream threads across all app instances will pause
> consuming whilst the rebalance is worked through.. but am I right in
> thinking that one streams app (or at least some of its stream threads) will
> have to wait for state to be synced from the changelog topic?
> > In other words - when a rebalance happens - I assume the consumer group
> doesn’t wait for the slowest member to be ready to consume?
> >
> > To illustrate with an example:
> >       If I have 3 partitions of a single topic and three streams app
> instances (1 partition each)
> >       I have a producer that produces to each partition each minute on
> the minute
> >       Normally the timestamp of the head record is roughly the same
> across all three partitions. This assumes no lag ever builds up on the
> consumer group, and also assumes data volume and size of messages is
> comparable.
> >
> >       Now I kill streams app A. The rebalance protocol kicks in and
> gives instance B an extra partition to consume from.
> >         Could there now be a bigger lag for one or both of the
> partitions app B is consuming from because it had to sync state store
> state? (Assume B has enough stream processing threads idle and the machine
> is specced to cope with the extra load)
> >        …whereas app C, unhindered by state syncing, has potentially now
> produced to the through topic a record from a newer batch/time window.
> >
> > If this is the case, do you think increasing standby replicas will
> lessen the issue?  I obviously don’t expect it to be a magic bullet, and
> grace period is still required in general
> >
> >
> > Best Regards,
> >
> > Marcus
> >
> >
> >
> >
> > On Thu, Mar 11, 2021 at 1:40 AM Matthias J. Sax <mjsax@apache.org
> <ma...@apache.org>> wrote:
> >> will it consider a timestamp in the body of the message, if we have
> implemented a custom TimeExtractor?
> >
> > Yes.
> >
> >
> >> Or, which I feel is more likely - does TimeExtractor stream time only
> apply later on once deserialisation has happened?
> >
> > Well, the extractor does apply after deserialization, but we deserialize
> > each partition head-record to be able to apply the timestamp extractor:
> > ie, deserialization happens when a record becomes the "head record".
> >
> > Cf
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
> <
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
> >
> >
> >
> >> the accuracy of the aggregates may have to come second to the
> throughput.
> >
> > Increasing the grace period should not really affect throughput, but
> > latency.
> >
> >
> >
> > -Matthias
> >
> >
> > On 3/10/21 3:37 PM, Marcus Horsley-Rai wrote:
> >>
> >> Thanks for your reply Matthias, and really great talks :-)
> >>
> >> You’re right that I only have one input topic - though it does have 20
> partitions.
> >> The pointer to max.task.idle.ms <http://max.task.idle.ms/> cleared
> something up for me; I read the following line from Kafka docs but couldn’t
> find what configuration they were referring to.
> >>
> >>>      Within a stream task that may be processing multiple
> topic-partitions, if users configure the application to not wait for all
> partitions to contain some buffered data and pick from the partition with
> the smallest timestamp to process the next record, then later on when some
> records are fetched for other topic-partitions, their timestamps may be
> smaller than those processed records fetched from another topic-partition.
> >>>
> >>
> >> When streams is checking the head record of each partition to pick the
> lowest timestamp - will it consider a timestamp in the body of the message,
> if we have implemented a custom TimeExtractor?
> >> Or, which I feel is more likely - does TimeExtractor stream time only
> apply later on once deserialisation has happened?
> >> The reason I ask is because our producer code doesn’t manually set the
> timestamp in ProducerRecord, only in the JSON body. That may be something
> we can look to change.
> >>
> >> As you say, I fear adjusting grace time may be my only solution;
> however because this is a real-time monitoring application…the accuracy of
> the aggregates may have to come second to the throughput.
> >>
> >> Many thanks,
> >>
> >> Marcus
> >>
> >>
> >> On 2021/03/09 08:21:22, "Matthias J. Sax" <m...@apache.org <mailto:
> m...@apache.org>> wrote:
> >>> In general, Kafka Streams tries to process messages in timestamp
> order,>
> >>> ie, oldest message first. However, Kafka Streams always need to
> process>
> >>> messages in offset order per partition, and thus, the timestamp>
> >>> synchronization applied to records from different topic (eg, if you
> join>
> >>> two topics).>
> >>>
> >>> There is config `max.task.idle.ms <http://max.task.idle.ms/>` to
> improve timestamp synchronization,>
> >>> but I am not sure if it would help in your case, as it seems you have
> a>
> >>> single input topic.>
> >>>
> >>> It seems, there is already out-of-order data in your input topic.
> Also>
> >>> note that your repartition step, may introduce out-or-order data.>
> >>>
> >>> As you are using a custom Processor, it is up to you to handle>
> >>> out-of-order data, and it seems that you may need to introduce a
> larger>
> >>> grace period. In general, it's very hard (too impossible) to know how>
> >>> much unorder is in a topic, due the decoupled nature of Kafka and>
> >>> interleaved writes of different producers into a topic.>
> >>>
> >>> Not sure if you could change the original partitioning to just use>
> >>> `location-id` to avoid the additional repartitioning step. This could>
> >>> help to reduce unorder.>
> >>>
> >>> For completeness, check out those Kafka Summit talks:>
> >>>  ->
> >>>
> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/
> <
> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/>>
>
> >>>  ->
> >>>
> https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/
> <
> https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/>>
>
> >>>
> >>> Hope this helps.>
> >>>
> >>> -Matthias>
> >>>
> >>> On 3/3/21 7:03 AM, Marcus Horsley-Rai wrote:>
> >>>> Hi All,>
> >>>>>
> >>>> Just to give a bit of context; I have an application which is SNMP
> polling>
> >>>> a network. Each collector agent works on a 1 minute schedule,
> polling>
> >>>> device(s) and posting results to a Kafka topic.>
> >>>> The time a given collector publishes data can vary within a minute,
> but it>
> >>>> should never overlap with the next minute time bucket.>
> >>>>>
> >>>> The topic produced to, for arguments sake 'device-results' has
> multiple>
> >>>> partitions. The data is keyed such as 'device-id|location-id'.>
> >>>>>
> >>>> I then had a requirement to aggregate the data by location; every
> device>
> >>>> result within the same location is summed, and an aggregate is output
> each>
> >>>> minute.>
> >>>> I'm aware the streams DSL has groupByKey/WindowedBy/Suppress which is
> a>
> >>>> solution to this problem - but we found the throughput was abysmal ->
> >>>> probably due to the I/O performance of our virtual machine
> infrastructure.>
> >>>>>
> >>>> Instead we have hand-rolled something simplistic - which does the job
> 99%>
> >>>> well.>
> >>>>  - We use a through() to re-partition the topic to just location-id>
> >>>>  - We keep an object representing the current minute's aggregate in
> an>
> >>>> in-memory state store (with changelog)>
> >>>>  - When any device result is transformed, and has a timestamp that is
> older>
> >>>> than our current window time - we output the aggregate, otherwise
> update>
> >>>> the running sum.>
> >>>>>
> >>>>  What I have noticed is that when I do a rolling restart of the>
> >>>> application, such as to push new code, data is dropped because of
> messages>
> >>>> processed out of order.>
> >>>> I changed the code to include the equivalent of an extra minute's
> grace>
> >>>> time, but in production I see messages arriving that are > 2min
> behind what>
> >>>> the latest messages are.>
> >>>>>
> >>>> I came across the documentation>
> >>>>
> https://kafka.apache.org/23/documentation/streams/core-concepts#streams_out_of_ordering
> <
> https://kafka.apache.org/23/documentation/streams/core-concepts#streams_out_of_ordering>>
>
> >>>> which alluded to maybe a solution.>
> >>>> Could anyone advise if there is a way in code/configuration
> properties that>
> >>>> I could better guarantee that streams prioritises the *oldest*
> messages>
> >>>> first, rather than caring about offset?>
> >>>>>
> >>>> Thanks in advance for any replies!>
> >>>>>
> >>>> Marcus>
> >>>>>
> >
>