You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Peter Levart <pe...@gmail.com> on 2019/01/15 10:00:09 UTC
max.task.idle.ms behavior
Hello!
I'm trying to understand the behavior of Kafka Streams consumers with
regards to max.task.idle.ms configuration parameter (default 0). The
documentation says:
max.task.idle.ms Medium Maximum amount of time a stream task
will stay idle when not all of its partition buffers contain records.
Suppose an input topic to a streams application has multiple partitions
and that traffic arrives to this input topic in bursts. At first all
partitions will be filled with records and all tasks will be busy. Then
some partition will get drained first. The task with drained partition
will bet paused for max.task.idle.ms time.
Question: How often will the task with drained partition pause for
max.task.idle.ms time due to missing records in a partition?
1. Before every record?
2. At each KafkaConsumer.poll() that returns any records?
3. Just the 1st time some partition is detected to have no polled
records, but then this partition will get "blacklisted" from further
checks for pausing until it again receives a record.
For example, if the task is assigned N partitions, what is the max. # of
times the task will pause for max.task.idle.ms time when the input burst
is over:
The wishful answer is N-1 times. Is this so?
Regards, Peter
Re: max.task.idle.ms behavior
Posted by "Matthias J. Sax" <ma...@confluent.io>.
That sounds correct.
Just to avoid confusion: we don't use the term "pause" for tasks, but
"idle", because consumers can "pause/resume" partitions what is
something different.
In fact, if a task is idle the underlying empty partitions are not
paused, because we hope to get data from them before the timeout is
exceeded.
-Matthias
On 1/18/19 2:17 AM, Peter Levart wrote:
> Hi Matthias,
>
> Thank you for the clarifications. May I ask about some more detail? I'll
> try to describe the behavior as I understand it. Please correct me if
> I'm wrong...
>
> So in effect each task alternates between NORMAL and ENFORCED processing
> modes.
>
> It starts in NORMAL mode where it is performing a kind of "merge" step
> (as in merge sort) among assigned partitions where records in a
> particular partition are assumed to be already sorted by timestamp. When
> any of the partitions drains, the task pauses (and doesn't process
> messages even if they have already been received by last poll call) and
> waits for drained partitions to start delivering messages so each
> partition has a candidate message to choose from before continue-ing.
> The important point as I see is that the merging process can only
> progress in NORMAL mode when it can choose the earliest message among
> all the partitions assigned to the task. If when pausing, timeout kicks
> in, the task enters ENFORCED processing mode.
>
> In ENFORCED processing mode, all the partitions are un-paused again and
> the merging continues, but it merges a chunk of messages from a
> particular poll at a time before polling for next chunk. If at some
> point all the partitions have a candidate message in a particular polled
> chunk, it reverts back to NORMAL processing mode...
>
> Is my understanding correct?
>
> Thanks, Peter
>
> On 1/16/19 8:15 AM, Matthias J. Sax wrote:
>> The parameter applies too all three topics (input, intermediate,
>> repartitions topics) and it's a global config.
>>
>> About the blocking behavior:
>>
>> If one partitions becomes empty, all other partitions are paused() and
>> Streams only poll() for the empty partition. If no data is returned
>> within the timeout, processing of enforced for this task for all
>> partitions.
>>
>> The task will stay in the "enforced processing state" until all
>> partitions deliver data again (at the same time).
>>
>> If a second partition becomes empty, no additional delay is applied.
>>
>> Hope this answers your question.
>>
>>
>> -Matthias
>>
>>
>> On 1/15/19 2:07 AM, Peter Levart wrote:
>>> Another question about this parameter.
>>>
>>> Does that parameter apply just to input topics to the KafkaStreams
>>> topology or also to intermediate (repartitioning) topics or to
>>> intermediate topics configured with KStream.through() directive?
>>>
>>> Is it possible to control the behavior on a per-topic basis?
>>>
>>> Thanks,
>>>
>>> Peter
>>>
>>> On 1/15/19 11:00 AM, Peter Levart wrote:
>>>> Hello!
>>>>
>>>> I'm trying to understand the behavior of Kafka Streams consumers with
>>>> regards to max.task.idle.ms configuration parameter (default 0). The
>>>> documentation says:
>>>>
>>>> max.task.idle.ms Medium Maximum amount of time a stream task
>>>> will stay idle when not all of its partition buffers contain records.
>>>>
>>>> Suppose an input topic to a streams application has multiple
>>>> partitions and that traffic arrives to this input topic in bursts. At
>>>> first all partitions will be filled with records and all tasks will be
>>>> busy. Then some partition will get drained first. The task with
>>>> drained partition will bet paused for max.task.idle.ms time.
>>>>
>>>> Question: How often will the task with drained partition pause for
>>>> max.task.idle.ms time due to missing records in a partition?
>>>>
>>>> 1. Before every record?
>>>> 2. At each KafkaConsumer.poll() that returns any records?
>>>> 3. Just the 1st time some partition is detected to have no polled
>>>> records, but then this partition will get "blacklisted" from further
>>>> checks for pausing until it again receives a record.
>>>>
>>>> For example, if the task is assigned N partitions, what is the max. #
>>>> of times the task will pause for max.task.idle.ms time when the input
>>>> burst is over:
>>>>
>>>> The wishful answer is N-1 times. Is this so?
>>>>
>>>> Regards, Peter
>>>>
>
Re: max.task.idle.ms behavior
Posted by Peter Levart <pe...@gmail.com>.
Hi Matthias,
Thank you for the clarifications. May I ask about some more detail? I'll
try to describe the behavior as I understand it. Please correct me if
I'm wrong...
So in effect each task alternates between NORMAL and ENFORCED processing
modes.
It starts in NORMAL mode where it is performing a kind of "merge" step
(as in merge sort) among assigned partitions where records in a
particular partition are assumed to be already sorted by timestamp. When
any of the partitions drains, the task pauses (and doesn't process
messages even if they have already been received by last poll call) and
waits for drained partitions to start delivering messages so each
partition has a candidate message to choose from before continue-ing.
The important point as I see is that the merging process can only
progress in NORMAL mode when it can choose the earliest message among
all the partitions assigned to the task. If when pausing, timeout kicks
in, the task enters ENFORCED processing mode.
In ENFORCED processing mode, all the partitions are un-paused again and
the merging continues, but it merges a chunk of messages from a
particular poll at a time before polling for next chunk. If at some
point all the partitions have a candidate message in a particular polled
chunk, it reverts back to NORMAL processing mode...
Is my understanding correct?
Thanks, Peter
On 1/16/19 8:15 AM, Matthias J. Sax wrote:
> The parameter applies too all three topics (input, intermediate,
> repartitions topics) and it's a global config.
>
> About the blocking behavior:
>
> If one partitions becomes empty, all other partitions are paused() and
> Streams only poll() for the empty partition. If no data is returned
> within the timeout, processing of enforced for this task for all partitions.
>
> The task will stay in the "enforced processing state" until all
> partitions deliver data again (at the same time).
>
> If a second partition becomes empty, no additional delay is applied.
>
> Hope this answers your question.
>
>
> -Matthias
>
>
> On 1/15/19 2:07 AM, Peter Levart wrote:
>> Another question about this parameter.
>>
>> Does that parameter apply just to input topics to the KafkaStreams
>> topology or also to intermediate (repartitioning) topics or to
>> intermediate topics configured with KStream.through() directive?
>>
>> Is it possible to control the behavior on a per-topic basis?
>>
>> Thanks,
>>
>> Peter
>>
>> On 1/15/19 11:00 AM, Peter Levart wrote:
>>> Hello!
>>>
>>> I'm trying to understand the behavior of Kafka Streams consumers with
>>> regards to max.task.idle.ms configuration parameter (default 0). The
>>> documentation says:
>>>
>>> max.task.idle.ms Medium Maximum amount of time a stream task
>>> will stay idle when not all of its partition buffers contain records.
>>>
>>> Suppose an input topic to a streams application has multiple
>>> partitions and that traffic arrives to this input topic in bursts. At
>>> first all partitions will be filled with records and all tasks will be
>>> busy. Then some partition will get drained first. The task with
>>> drained partition will bet paused for max.task.idle.ms time.
>>>
>>> Question: How often will the task with drained partition pause for
>>> max.task.idle.ms time due to missing records in a partition?
>>>
>>> 1. Before every record?
>>> 2. At each KafkaConsumer.poll() that returns any records?
>>> 3. Just the 1st time some partition is detected to have no polled
>>> records, but then this partition will get "blacklisted" from further
>>> checks for pausing until it again receives a record.
>>>
>>> For example, if the task is assigned N partitions, what is the max. #
>>> of times the task will pause for max.task.idle.ms time when the input
>>> burst is over:
>>>
>>> The wishful answer is N-1 times. Is this so?
>>>
>>> Regards, Peter
>>>
Re: max.task.idle.ms behavior
Posted by "Matthias J. Sax" <ma...@confluent.io>.
The parameter applies too all three topics (input, intermediate,
repartitions topics) and it's a global config.
About the blocking behavior:
If one partitions becomes empty, all other partitions are paused() and
Streams only poll() for the empty partition. If no data is returned
within the timeout, processing of enforced for this task for all partitions.
The task will stay in the "enforced processing state" until all
partitions deliver data again (at the same time).
If a second partition becomes empty, no additional delay is applied.
Hope this answers your question.
-Matthias
On 1/15/19 2:07 AM, Peter Levart wrote:
> Another question about this parameter.
>
> Does that parameter apply just to input topics to the KafkaStreams
> topology or also to intermediate (repartitioning) topics or to
> intermediate topics configured with KStream.through() directive?
>
> Is it possible to control the behavior on a per-topic basis?
>
> Thanks,
>
> Peter
>
> On 1/15/19 11:00 AM, Peter Levart wrote:
>> Hello!
>>
>> I'm trying to understand the behavior of Kafka Streams consumers with
>> regards to max.task.idle.ms configuration parameter (default 0). The
>> documentation says:
>>
>> max.task.idle.ms Medium Maximum amount of time a stream task
>> will stay idle when not all of its partition buffers contain records.
>>
>> Suppose an input topic to a streams application has multiple
>> partitions and that traffic arrives to this input topic in bursts. At
>> first all partitions will be filled with records and all tasks will be
>> busy. Then some partition will get drained first. The task with
>> drained partition will bet paused for max.task.idle.ms time.
>>
>> Question: How often will the task with drained partition pause for
>> max.task.idle.ms time due to missing records in a partition?
>>
>> 1. Before every record?
>> 2. At each KafkaConsumer.poll() that returns any records?
>> 3. Just the 1st time some partition is detected to have no polled
>> records, but then this partition will get "blacklisted" from further
>> checks for pausing until it again receives a record.
>>
>> For example, if the task is assigned N partitions, what is the max. #
>> of times the task will pause for max.task.idle.ms time when the input
>> burst is over:
>>
>> The wishful answer is N-1 times. Is this so?
>>
>> Regards, Peter
>>
>
Re: max.task.idle.ms behavior
Posted by Peter Levart <pe...@gmail.com>.
Another question about this parameter.
Does that parameter apply just to input topics to the KafkaStreams
topology or also to intermediate (repartitioning) topics or to
intermediate topics configured with KStream.through() directive?
Is it possible to control the behavior on a per-topic basis?
Thanks,
Peter
On 1/15/19 11:00 AM, Peter Levart wrote:
> Hello!
>
> I'm trying to understand the behavior of Kafka Streams consumers with
> regards to max.task.idle.ms configuration parameter (default 0). The
> documentation says:
>
> max.task.idle.ms Medium Maximum amount of time a stream task
> will stay idle when not all of its partition buffers contain records.
>
> Suppose an input topic to a streams application has multiple
> partitions and that traffic arrives to this input topic in bursts. At
> first all partitions will be filled with records and all tasks will be
> busy. Then some partition will get drained first. The task with
> drained partition will bet paused for max.task.idle.ms time.
>
> Question: How often will the task with drained partition pause for
> max.task.idle.ms time due to missing records in a partition?
>
> 1. Before every record?
> 2. At each KafkaConsumer.poll() that returns any records?
> 3. Just the 1st time some partition is detected to have no polled
> records, but then this partition will get "blacklisted" from further
> checks for pausing until it again receives a record.
>
> For example, if the task is assigned N partitions, what is the max. #
> of times the task will pause for max.task.idle.ms time when the input
> burst is over:
>
> The wishful answer is N-1 times. Is this so?
>
> Regards, Peter
>