You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Peter Levart <pe...@gmail.com> on 2019/01/15 10:00:09 UTC

max.task.idle.ms behavior

Hello!

I'm trying to understand the behavior of Kafka Streams consumers with 
regards to max.task.idle.ms configuration parameter (default 0). The 
documentation says:

max.task.idle.ms     Medium     Maximum amount of time a stream task 
will stay idle when not all of its partition buffers contain records.

Suppose an input topic to a streams application has multiple partitions 
and that traffic arrives to this input topic in bursts. At first all 
partitions will be filled with records and all tasks will be busy. Then 
some partition will get drained first. The task with drained partition 
will bet paused for max.task.idle.ms time.

Question: How often will the task with drained partition pause for 
max.task.idle.ms time due to missing records in a partition?

1. Before every record?
2. At each KafkaConsumer.poll() that returns any records?
3. Just the 1st time some partition is detected to have no polled 
records, but then this partition will get "blacklisted" from further 
checks for pausing until it again receives a record.

For example, if the task is assigned N partitions, what is the max. # of 
times the task will pause for max.task.idle.ms time when the input burst 
is over:

The wishful answer is N-1 times. Is this so?

Regards, Peter


Re: max.task.idle.ms behavior

Posted by "Matthias J. Sax" <ma...@confluent.io>.
That sounds correct.

Just to avoid confusion: we don't use the term "pause" for tasks, but
"idle", because consumers can "pause/resume" partitions what is
something different.

In fact, if a task is idle the underlying empty partitions are not
paused, because we hope to get data from them before the timeout is
exceeded.


-Matthias

On 1/18/19 2:17 AM, Peter Levart wrote:
> Hi Matthias,
> 
> Thank you for the clarifications. May I ask about some more detail? I'll
> try to describe the behavior as I understand it. Please correct me if
> I'm wrong...
> 
> So in effect each task alternates between NORMAL and ENFORCED processing
> modes.
> 
> It starts in NORMAL mode where it is performing a kind of "merge" step
> (as in merge sort) among assigned partitions where records in a
> particular partition are assumed to be already sorted by timestamp. When
> any of the partitions drains, the task pauses (and doesn't process
> messages even if they have already been received by last poll call) and
> waits for drained partitions to start delivering messages so each
> partition has a candidate message to choose from before continue-ing.
> The important point as I see is that the merging process can only
> progress in NORMAL mode when it can choose the earliest message among
> all the partitions assigned to the task. If when pausing, timeout kicks
> in, the task enters ENFORCED processing mode.
> 
> In ENFORCED processing mode, all the partitions are un-paused again and
> the merging continues, but it merges a chunk of messages from a
> particular poll at a time before polling for next chunk. If at some
> point all the partitions have a candidate message in a particular polled
> chunk, it reverts back to NORMAL processing mode...
> 
> Is my understanding correct?
> 
> Thanks, Peter
> 
> On 1/16/19 8:15 AM, Matthias J. Sax wrote:
>> The parameter applies too all three topics (input, intermediate,
>> repartitions topics) and it's a global config.
>>
>> About the blocking behavior:
>>
>> If one partitions becomes empty, all other partitions are paused() and
>> Streams only poll() for the empty partition. If no data is returned
>> within the timeout, processing of enforced for this task for all
>> partitions.
>>
>> The task will stay in the "enforced processing state" until all
>> partitions deliver data again (at the same time).
>>
>> If a second partition becomes empty, no additional delay is applied.
>>
>> Hope this answers your question.
>>
>>
>> -Matthias
>>
>>
>> On 1/15/19 2:07 AM, Peter Levart wrote:
>>> Another question about this parameter.
>>>
>>> Does that parameter apply just to input topics to the KafkaStreams
>>> topology or also to intermediate (repartitioning) topics or to
>>> intermediate topics configured with KStream.through() directive?
>>>
>>> Is it possible to control the behavior on a per-topic basis?
>>>
>>> Thanks,
>>>
>>> Peter
>>>
>>> On 1/15/19 11:00 AM, Peter Levart wrote:
>>>> Hello!
>>>>
>>>> I'm trying to understand the behavior of Kafka Streams consumers with
>>>> regards to max.task.idle.ms configuration parameter (default 0). The
>>>> documentation says:
>>>>
>>>> max.task.idle.ms     Medium     Maximum amount of time a stream task
>>>> will stay idle when not all of its partition buffers contain records.
>>>>
>>>> Suppose an input topic to a streams application has multiple
>>>> partitions and that traffic arrives to this input topic in bursts. At
>>>> first all partitions will be filled with records and all tasks will be
>>>> busy. Then some partition will get drained first. The task with
>>>> drained partition will bet paused for max.task.idle.ms time.
>>>>
>>>> Question: How often will the task with drained partition pause for
>>>> max.task.idle.ms time due to missing records in a partition?
>>>>
>>>> 1. Before every record?
>>>> 2. At each KafkaConsumer.poll() that returns any records?
>>>> 3. Just the 1st time some partition is detected to have no polled
>>>> records, but then this partition will get "blacklisted" from further
>>>> checks for pausing until it again receives a record.
>>>>
>>>> For example, if the task is assigned N partitions, what is the max. #
>>>> of times the task will pause for max.task.idle.ms time when the input
>>>> burst is over:
>>>>
>>>> The wishful answer is N-1 times. Is this so?
>>>>
>>>> Regards, Peter
>>>>
> 


Re: max.task.idle.ms behavior

Posted by Peter Levart <pe...@gmail.com>.
Hi Matthias,

Thank you for the clarifications. May I ask about some more detail? I'll 
try to describe the behavior as I understand it. Please correct me if 
I'm wrong...

So in effect each task alternates between NORMAL and ENFORCED processing 
modes.

It starts in NORMAL mode where it is performing a kind of "merge" step 
(as in merge sort) among assigned partitions where records in a 
particular partition are assumed to be already sorted by timestamp. When 
any of the partitions drains, the task pauses (and doesn't process 
messages even if they have already been received by last poll call) and 
waits for drained partitions to start delivering messages so each 
partition has a candidate message to choose from before continue-ing. 
The important point as I see is that the merging process can only 
progress in NORMAL mode when it can choose the earliest message among 
all the partitions assigned to the task. If when pausing, timeout kicks 
in, the task enters ENFORCED processing mode.

In ENFORCED processing mode, all the partitions are un-paused again and 
the merging continues, but it merges a chunk of messages from a 
particular poll at a time before polling for next chunk. If at some 
point all the partitions have a candidate message in a particular polled 
chunk, it reverts back to NORMAL processing mode...

Is my understanding correct?

Thanks, Peter

On 1/16/19 8:15 AM, Matthias J. Sax wrote:
> The parameter applies too all three topics (input, intermediate,
> repartitions topics) and it's a global config.
>
> About the blocking behavior:
>
> If one partitions becomes empty, all other partitions are paused() and
> Streams only poll() for the empty partition. If no data is returned
> within the timeout, processing of enforced for this task for all partitions.
>
> The task will stay in the "enforced processing state" until all
> partitions deliver data again (at the same time).
>
> If a second partition becomes empty, no additional delay is applied.
>
> Hope this answers your question.
>
>
> -Matthias
>
>
> On 1/15/19 2:07 AM, Peter Levart wrote:
>> Another question about this parameter.
>>
>> Does that parameter apply just to input topics to the KafkaStreams
>> topology or also to intermediate (repartitioning) topics or to
>> intermediate topics configured with KStream.through() directive?
>>
>> Is it possible to control the behavior on a per-topic basis?
>>
>> Thanks,
>>
>> Peter
>>
>> On 1/15/19 11:00 AM, Peter Levart wrote:
>>> Hello!
>>>
>>> I'm trying to understand the behavior of Kafka Streams consumers with
>>> regards to max.task.idle.ms configuration parameter (default 0). The
>>> documentation says:
>>>
>>> max.task.idle.ms     Medium     Maximum amount of time a stream task
>>> will stay idle when not all of its partition buffers contain records.
>>>
>>> Suppose an input topic to a streams application has multiple
>>> partitions and that traffic arrives to this input topic in bursts. At
>>> first all partitions will be filled with records and all tasks will be
>>> busy. Then some partition will get drained first. The task with
>>> drained partition will bet paused for max.task.idle.ms time.
>>>
>>> Question: How often will the task with drained partition pause for
>>> max.task.idle.ms time due to missing records in a partition?
>>>
>>> 1. Before every record?
>>> 2. At each KafkaConsumer.poll() that returns any records?
>>> 3. Just the 1st time some partition is detected to have no polled
>>> records, but then this partition will get "blacklisted" from further
>>> checks for pausing until it again receives a record.
>>>
>>> For example, if the task is assigned N partitions, what is the max. #
>>> of times the task will pause for max.task.idle.ms time when the input
>>> burst is over:
>>>
>>> The wishful answer is N-1 times. Is this so?
>>>
>>> Regards, Peter
>>>


Re: max.task.idle.ms behavior

Posted by "Matthias J. Sax" <ma...@confluent.io>.
The parameter applies too all three topics (input, intermediate,
repartitions topics) and it's a global config.

About the blocking behavior:

If one partitions becomes empty, all other partitions are paused() and
Streams only poll() for the empty partition. If no data is returned
within the timeout, processing of enforced for this task for all partitions.

The task will stay in the "enforced processing state" until all
partitions deliver data again (at the same time).

If a second partition becomes empty, no additional delay is applied.

Hope this answers your question.


-Matthias


On 1/15/19 2:07 AM, Peter Levart wrote:
> Another question about this parameter.
> 
> Does that parameter apply just to input topics to the KafkaStreams
> topology or also to intermediate (repartitioning) topics or to
> intermediate topics configured with KStream.through() directive?
> 
> Is it possible to control the behavior on a per-topic basis?
> 
> Thanks,
> 
> Peter
> 
> On 1/15/19 11:00 AM, Peter Levart wrote:
>> Hello!
>>
>> I'm trying to understand the behavior of Kafka Streams consumers with
>> regards to max.task.idle.ms configuration parameter (default 0). The
>> documentation says:
>>
>> max.task.idle.ms     Medium     Maximum amount of time a stream task
>> will stay idle when not all of its partition buffers contain records.
>>
>> Suppose an input topic to a streams application has multiple
>> partitions and that traffic arrives to this input topic in bursts. At
>> first all partitions will be filled with records and all tasks will be
>> busy. Then some partition will get drained first. The task with
>> drained partition will bet paused for max.task.idle.ms time.
>>
>> Question: How often will the task with drained partition pause for
>> max.task.idle.ms time due to missing records in a partition?
>>
>> 1. Before every record?
>> 2. At each KafkaConsumer.poll() that returns any records?
>> 3. Just the 1st time some partition is detected to have no polled
>> records, but then this partition will get "blacklisted" from further
>> checks for pausing until it again receives a record.
>>
>> For example, if the task is assigned N partitions, what is the max. #
>> of times the task will pause for max.task.idle.ms time when the input
>> burst is over:
>>
>> The wishful answer is N-1 times. Is this so?
>>
>> Regards, Peter
>>
> 


Re: max.task.idle.ms behavior

Posted by Peter Levart <pe...@gmail.com>.
Another question about this parameter.

Does that parameter apply just to input topics to the KafkaStreams 
topology or also to intermediate (repartitioning) topics or to 
intermediate topics configured with KStream.through() directive?

Is it possible to control the behavior on a per-topic basis?

Thanks,

Peter

On 1/15/19 11:00 AM, Peter Levart wrote:
> Hello!
>
> I'm trying to understand the behavior of Kafka Streams consumers with 
> regards to max.task.idle.ms configuration parameter (default 0). The 
> documentation says:
>
> max.task.idle.ms     Medium     Maximum amount of time a stream task 
> will stay idle when not all of its partition buffers contain records.
>
> Suppose an input topic to a streams application has multiple 
> partitions and that traffic arrives to this input topic in bursts. At 
> first all partitions will be filled with records and all tasks will be 
> busy. Then some partition will get drained first. The task with 
> drained partition will bet paused for max.task.idle.ms time.
>
> Question: How often will the task with drained partition pause for 
> max.task.idle.ms time due to missing records in a partition?
>
> 1. Before every record?
> 2. At each KafkaConsumer.poll() that returns any records?
> 3. Just the 1st time some partition is detected to have no polled 
> records, but then this partition will get "blacklisted" from further 
> checks for pausing until it again receives a record.
>
> For example, if the task is assigned N partitions, what is the max. # 
> of times the task will pause for max.task.idle.ms time when the input 
> burst is over:
>
> The wishful answer is N-1 times. Is this so?
>
> Regards, Peter
>