You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Derek Mok <de...@gmail.com> on 2022/08/29 14:54:40 UTC

Understanding Kafka Streams max.task.idle.ms

Hi, I'd like some help with understanding how exactly max.task.idle.ms
works. I have a topology that consumes from an input topic A, and a join
operator that enriches the topic A messages with a KTable from a compacted
topic B. The enriched messages are output to topic C.

If I set max.task.idle.ms to 5000, what should be the expected behaviour in
the following scenario (assuming all messages have the same key and topics
have same partition count):

   1. Message published to A with timestamp t_0
   2. Three seconds of wall clock time elapses
   3. Message published to B with timestamp t_0

My understanding is that a message should be output to topic C containing
the enriched result after max.task.idle.ms elapses since it should account
for the late producer of topic B. The join operator should only be invoked
after max.task.idle.ms has elapsed. However, what actually happens is that
nothing is published in topic C. I suspect my understanding of
max.task.idle.ms is not entirely correct so would appreciate any insight
here!

Here is a small project demonstrating the above
https://github.com/ThousandEyes-Derek/max-task-idle-ms

MaxTaskIdleStreamsApp contains the streams application that describes the
topology.
MessageProducer produces messages into the input and compacted topics based
on the scenario I described above.

Thanks!
Derek

Re: Understanding Kafka Streams max.task.idle.ms

Posted by Derek Mok <de...@gmail.com>.
Hi Alexandre, thanks for your reply. I updated my example to produce a
message to the compacted topic ("table") at a lower timestamp, but I'm
still not getting the expected result unfortunately. The scenario is now:

1. Message published to A ("input") with timestamp t_0
2. Three seconds of wall clock time elapses
3. Message published to B ("table") with timestamp (t_0 - 1)



On Tue, Aug 30, 2022 at 12:15 AM Alexandre Brasil <
alexandre.brasil@gmail.com> wrote:

> Hi Derek,
>
> What max.task.idle.ms does is set a wait time for the stream application
> to
> wait for new messages when one or more
> input topics have no messages after a poll. In your case, the application
> polls for the first time and finds a message on
> topic A ("input") and no messages on topic B ("table"). Since you have
> max.task.idle.ms set to 5000, it waits up to five
> seconds for messages to arrive on topic B. When you produce your second
> message three seconds later, the app will
> process the messages.
>
> Kafka Streams will process the messages from both topics in timestamp
> order, but since both of your messages have
> the same timestamp, my guess is that it's processing the message from the
> stream ("input") first and it finds nothing to
> join to on "table". My guess is that if you tweak the second message
> timestamp to be lower than the timestamp of the
> second message you'll get the result you want.
>
> Regards,
> Alexandre
>
> On Mon, Aug 29, 2022 at 11:55 AM Derek Mok <de...@gmail.com>
> wrote:
>
> > Hi, I'd like some help with understanding how exactly max.task.idle.ms
> > works. I have a topology that consumes from an input topic A, and a join
> > operator that enriches the topic A messages with a KTable from a
> compacted
> > topic B. The enriched messages are output to topic C.
> >
> > If I set max.task.idle.ms to 5000, what should be the expected behaviour
> > in
> > the following scenario (assuming all messages have the same key and
> topics
> > have same partition count):
> >
> >    1. Message published to A with timestamp t_0
> >    2. Three seconds of wall clock time elapses
> >    3. Message published to B with timestamp t_0
> >
> > My understanding is that a message should be output to topic C containing
> > the enriched result after max.task.idle.ms elapses since it should
> account
> > for the late producer of topic B. The join operator should only be
> invoked
> > after max.task.idle.ms has elapsed. However, what actually happens is
> that
> > nothing is published in topic C. I suspect my understanding of
> > max.task.idle.ms is not entirely correct so would appreciate any insight
> > here!
> >
> > Here is a small project demonstrating the above
> > https://github.com/ThousandEyes-Derek/max-task-idle-ms
> >
> > MaxTaskIdleStreamsApp contains the streams application that describes the
> > topology.
> > MessageProducer produces messages into the input and compacted topics
> based
> > on the scenario I described above.
> >
> > Thanks!
> > Derek
> >
>

Re: Understanding Kafka Streams max.task.idle.ms

Posted by Alexandre Brasil <al...@gmail.com>.
Hi Derek,

What max.task.idle.ms does is set a wait time for the stream application to
wait for new messages when one or more
input topics have no messages after a poll. In your case, the application
polls for the first time and finds a message on
topic A ("input") and no messages on topic B ("table"). Since you have
max.task.idle.ms set to 5000, it waits up to five
seconds for messages to arrive on topic B. When you produce your second
message three seconds later, the app will
process the messages.

Kafka Streams will process the messages from both topics in timestamp
order, but since both of your messages have
the same timestamp, my guess is that it's processing the message from the
stream ("input") first and it finds nothing to
join to on "table". My guess is that if you tweak the second message
timestamp to be lower than the timestamp of the
second message you'll get the result you want.

Regards,
Alexandre

On Mon, Aug 29, 2022 at 11:55 AM Derek Mok <de...@gmail.com> wrote:

> Hi, I'd like some help with understanding how exactly max.task.idle.ms
> works. I have a topology that consumes from an input topic A, and a join
> operator that enriches the topic A messages with a KTable from a compacted
> topic B. The enriched messages are output to topic C.
>
> If I set max.task.idle.ms to 5000, what should be the expected behaviour
> in
> the following scenario (assuming all messages have the same key and topics
> have same partition count):
>
>    1. Message published to A with timestamp t_0
>    2. Three seconds of wall clock time elapses
>    3. Message published to B with timestamp t_0
>
> My understanding is that a message should be output to topic C containing
> the enriched result after max.task.idle.ms elapses since it should account
> for the late producer of topic B. The join operator should only be invoked
> after max.task.idle.ms has elapsed. However, what actually happens is that
> nothing is published in topic C. I suspect my understanding of
> max.task.idle.ms is not entirely correct so would appreciate any insight
> here!
>
> Here is a small project demonstrating the above
> https://github.com/ThousandEyes-Derek/max-task-idle-ms
>
> MaxTaskIdleStreamsApp contains the streams application that describes the
> topology.
> MessageProducer produces messages into the input and compacted topics based
> on the scenario I described above.
>
> Thanks!
> Derek
>

Re: Understanding Kafka Streams max.task.idle.ms

Posted by Leif Wickland <lw...@magnite.com.INVALID>.
unsubscribe

On Mon, Aug 29, 2022 at 8:56 AM Derek Mok <de...@gmail.com> wrote:

> Hi, I'd like some help with understanding how exactly max.task.idle.ms
> works. I have a topology that consumes from an input topic A, and a join
> operator that enriches the topic A messages with a KTable from a compacted
> topic B. The enriched messages are output to topic C.
>
> If I set max.task.idle.ms to 5000, what should be the expected behaviour
> in
> the following scenario (assuming all messages have the same key and topics
> have same partition count):
>
>    1. Message published to A with timestamp t_0
>    2. Three seconds of wall clock time elapses
>    3. Message published to B with timestamp t_0
>
> My understanding is that a message should be output to topic C containing
> the enriched result after max.task.idle.ms elapses since it should account
> for the late producer of topic B. The join operator should only be invoked
> after max.task.idle.ms has elapsed. However, what actually happens is that
> nothing is published in topic C. I suspect my understanding of
> max.task.idle.ms is not entirely correct so would appreciate any insight
> here!
>
> Here is a small project demonstrating the above
> https://github.com/ThousandEyes-Derek/max-task-idle-ms
>
> MaxTaskIdleStreamsApp contains the streams application that describes the
> topology.
> MessageProducer produces messages into the input and compacted topics based
> on the scenario I described above.
>
> Thanks!
> Derek
>