You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Alex Craig <al...@gmail.com> on 2020/04/09 13:33:41 UTC

Kafka Streams endless rebalancing

Hi all, I’ve got a Kafka Streams application running in a Kubernetes
environment.  The topology on this application has 2 aggregations (and
therefore 2 Ktables), both of which can get fairly large – the first is
around 200GB and the second around 500GB.  As with any K8s platform, pods
can occasionally get rescheduled or go down, which of course will cause my
application to rebalance.  However, what I’m seeing is the application will
literally spend hours rebalancing, without any errors being thrown or other
obvious causes for the frequent rebalances – all I can see in the logs is
an instance will be restoring a state store from the changelog topic, then
suddenly it will have its partitions revoked and begin the join-group
process all over again.  (I’m running 10 pods/instances of my app, and I
see this same pattern in each instance)  In some cases it never really
recovers from this rebalancing cycle – even after 12 hours or more - and
I’ve had to scale down the application completely and start over by purging
the application state and re-consuming from earliest on the source topic.
Interestingly, after purging and starting from scratch the application
seems to recover from rebalances pretty easily.

The storage I’m using is a NAS device, which admittedly is not particularly
fast.  (it’s using spinning disks and is shared amongst other tenants) As
an experiment, I’ve tried switching the k8s storage to an in-memory option
(this is at the k8s layer - the application is still using the same RocksDB
stores) to see if that helps.  As it turns out, I never have the rebalance
problem when using an in-memory persistence layer.  If a pod goes down, the
application spends around 10 - 15 minutes rebalancing and then is back to
processing data again.

At this point I guess my main question is: when I’m using the NAS storage
and the state stores are fairly large, could I be hitting some timeout
somewhere that isn’t allowing the restore process to complete, which then
triggers another rebalance?  In other words, the restore process is simply
taking too long given the amount of data needed to restore and the slow
storage?   I’m currently using Kafka 2.4.1, but I saw this same behavior in
2.3.  I am using a custom RocksDB config setter to limit off-heap memory,
but I’ve tried removing that and saw no difference in the rebalance
problem.  Again, no errors that I’m seeing or anything else in the logs
that seems to indicate why it can never finish rebalancing.  I’ve tried
turning on DEBUG logging but I’m having a tough time sifting through the
amount of log messages, though I’m still looking.

If anyone has any ideas I would appreciate it, thanks!

Alex C

Re: Kafka Streams endless rebalancing

Posted by Peter Levart <pe...@gmail.com>.

On 4/11/20 2:45 PM, Alex Craig wrote:
> Yep, max poll interval is 2147483647 and session timeout is 120000 (2
> minutes).  I don't have anything set for heartbeat.interval.ms, so it must
> be using the default. (3 seconds I think?)  Hmm, is it possible the
> heartbeat might not happen if the client app was so swamped with restoring
> data (and maybe starved for CPU) and then the broker ultimately kicked it
> out of the group?  Seems unlikely, since it would have to have been missed
> for a full 2 minutes..   Getting the broker logs can be a challenge, but
> I'll see if I can get ahold of them.  Is there any text I should be
> looking for, or are rebalances pretty clear in the logs?  Thanks again for
> the help!
>
> Alex

Hi Alex,

What GC algorithm are you using and how big is hour heap? GC pauses can 
be measured in minutes sometimes, depending od GC algorithm chosen, heap 
size and workload.

Peter

>
> On Fri, Apr 10, 2020 at 4:15 PM John Roesler <vv...@apache.org> wrote:
>
>> Hey Alex,
>>
>> Huh.
>>
>> Unprefixed configs apply to all consumers, but in this case, it's
>> irrelevant because only the "main" consumer participates in group
>> management (so the config only applies to the main consumer).
>>
>> So you actually have max.poll.interval.ms set to Integer.MAX_VALUE,
>> which amounts to 25 days? I agree, in that case it doesn't seem like
>> it could be a slow batch. In fact, it couldn't be anything related to
>> polling, since you see rebalances sooner than 25 days.
>>
>> If you have the broker logs, they'll contain the reason for the rebalance.
>> The only other thing I can think of that causes rebalances is failing to
>> heartbeat. What do you have for session.timeout.ms and
>> heartbeat.interval.ms ?
>>
>> If anyone else has any ideas, please jump in.
>>
>> Thanks,
>> -John
>>
>> On Fri, Apr 10, 2020, at 14:55, Alex Craig wrote:
>>> Thanks John, I double-checked my configs and I've actually got the
>>> max.poll.interval.ms set to the max (not prefixed with anything so
>>> presumably that’s the “main” consumer).  So I think that means the
>> problem
>>> isn’t due to a single batch of messages not getting processed/committed
>>> within the polling cycle right?  I guess what I’m wondering is, could the
>>> OVERALL length of time needed to fully restore the state stores (which
>>> could be multiple topics with multiple partitions) be exceeding some
>>> timeout or threshold?  Thanks again for any ideas,
>>>
>>>
>>>
>>> Alex C
>>>
>>>
>>> On Thu, Apr 9, 2020 at 9:36 AM John Roesler <vv...@apache.org> wrote:
>>>
>>>> Hi Alex,
>>>>
>>>> It sounds like your theory is plausible. After a rebalance, Streams
>> needs
>>>> to restore its stores from the changelog topics. Currently, Streams
>>>> performs this restore operation in the same loop that does processing
>> and
>>>> polls the consumer for more records. If the restore batches (or the
>>>> processing) take too long, Streams won’t be able to call Consumer#poll
>> (on
>>>> the “main” consumer)within the max.poll.interval, which causes the
>>>> Consumer’s heartbeat thread to assume the instance is unhealthy and
>> stop
>>>> sending heartbeats, which in turn causes another rebalance.
>>>>
>>>> You could try either adjusting the max poll interval for the _main_
>>>> consumer or decreasing the batch size for the _restore_ consumer to
>> make
>>>> sure Streams can call poll() frequently enough to stay in the group.
>> There
>>>> are prefixes you can add to the consumer configuration portions to
>> target
>>>> the main or restore consumer.
>>>>
>>>> Also worth noting, we’re planning to change this up pretty soon, so
>> that
>>>> restoration happens in a separate thread and doesn’t block polling like
>>>> this.
>>>>
>>>> I hope this helps!
>>>> -John
>>>>
>>>> On Thu, Apr 9, 2020, at 08:33, Alex Craig wrote:
>>>>> Hi all, I’ve got a Kafka Streams application running in a Kubernetes
>>>>> environment.  The topology on this application has 2 aggregations
>> (and
>>>>> therefore 2 Ktables), both of which can get fairly large – the first
>> is
>>>>> around 200GB and the second around 500GB.  As with any K8s platform,
>> pods
>>>>> can occasionally get rescheduled or go down, which of course will
>> cause
>>>> my
>>>>> application to rebalance.  However, what I’m seeing is the
>> application
>>>> will
>>>>> literally spend hours rebalancing, without any errors being thrown or
>>>> other
>>>>> obvious causes for the frequent rebalances – all I can see in the
>> logs is
>>>>> an instance will be restoring a state store from the changelog topic,
>>>> then
>>>>> suddenly it will have its partitions revoked and begin the join-group
>>>>> process all over again.  (I’m running 10 pods/instances of my app,
>> and I
>>>>> see this same pattern in each instance)  In some cases it never
>> really
>>>>> recovers from this rebalancing cycle – even after 12 hours or more -
>> and
>>>>> I’ve had to scale down the application completely and start over by
>>>> purging
>>>>> the application state and re-consuming from earliest on the source
>> topic.
>>>>> Interestingly, after purging and starting from scratch the
>> application
>>>>> seems to recover from rebalances pretty easily.
>>>>>
>>>>> The storage I’m using is a NAS device, which admittedly is not
>>>> particularly
>>>>> fast.  (it’s using spinning disks and is shared amongst other
>> tenants) As
>>>>> an experiment, I’ve tried switching the k8s storage to an in-memory
>>>> option
>>>>> (this is at the k8s layer - the application is still using the same
>>>> RocksDB
>>>>> stores) to see if that helps.  As it turns out, I never have the
>>>> rebalance
>>>>> problem when using an in-memory persistence layer.  If a pod goes
>> down,
>>>> the
>>>>> application spends around 10 - 15 minutes rebalancing and then is
>> back to
>>>>> processing data again.
>>>>>
>>>>> At this point I guess my main question is: when I’m using the NAS
>> storage
>>>>> and the state stores are fairly large, could I be hitting some
>> timeout
>>>>> somewhere that isn’t allowing the restore process to complete, which
>> then
>>>>> triggers another rebalance?  In other words, the restore process is
>>>> simply
>>>>> taking too long given the amount of data needed to restore and the
>> slow
>>>>> storage?   I’m currently using Kafka 2.4.1, but I saw this same
>> behavior
>>>> in
>>>>> 2.3.  I am using a custom RocksDB config setter to limit off-heap
>> memory,
>>>>> but I’ve tried removing that and saw no difference in the rebalance
>>>>> problem.  Again, no errors that I’m seeing or anything else in the
>> logs
>>>>> that seems to indicate why it can never finish rebalancing.  I’ve
>> tried
>>>>> turning on DEBUG logging but I’m having a tough time sifting through
>> the
>>>>> amount of log messages, though I’m still looking.
>>>>>
>>>>> If anyone has any ideas I would appreciate it, thanks!
>>>>>
>>>>> Alex C
>>>>>


Re: Kafka Streams endless rebalancing

Posted by Alex Craig <al...@gmail.com>.
Yep, max poll interval is 2147483647 and session timeout is 120000 (2
minutes).  I don't have anything set for heartbeat.interval.ms, so it must
be using the default. (3 seconds I think?)  Hmm, is it possible the
heartbeat might not happen if the client app was so swamped with restoring
data (and maybe starved for CPU) and then the broker ultimately kicked it
out of the group?  Seems unlikely, since it would have to have been missed
for a full 2 minutes..   Getting the broker logs can be a challenge, but
I'll see if I can get ahold of them.  Is there any text I should be
looking for, or are rebalances pretty clear in the logs?  Thanks again for
the help!

Alex

On Fri, Apr 10, 2020 at 4:15 PM John Roesler <vv...@apache.org> wrote:

> Hey Alex,
>
> Huh.
>
> Unprefixed configs apply to all consumers, but in this case, it's
> irrelevant because only the "main" consumer participates in group
> management (so the config only applies to the main consumer).
>
> So you actually have max.poll.interval.ms set to Integer.MAX_VALUE,
> which amounts to 25 days? I agree, in that case it doesn't seem like
> it could be a slow batch. In fact, it couldn't be anything related to
> polling, since you see rebalances sooner than 25 days.
>
> If you have the broker logs, they'll contain the reason for the rebalance.
> The only other thing I can think of that causes rebalances is failing to
> heartbeat. What do you have for session.timeout.ms and
> heartbeat.interval.ms ?
>
> If anyone else has any ideas, please jump in.
>
> Thanks,
> -John
>
> On Fri, Apr 10, 2020, at 14:55, Alex Craig wrote:
> > Thanks John, I double-checked my configs and I've actually got the
> > max.poll.interval.ms set to the max (not prefixed with anything so
> > presumably that’s the “main” consumer).  So I think that means the
> problem
> > isn’t due to a single batch of messages not getting processed/committed
> > within the polling cycle right?  I guess what I’m wondering is, could the
> > OVERALL length of time needed to fully restore the state stores (which
> > could be multiple topics with multiple partitions) be exceeding some
> > timeout or threshold?  Thanks again for any ideas,
> >
> >
> >
> > Alex C
> >
> >
> > On Thu, Apr 9, 2020 at 9:36 AM John Roesler <vv...@apache.org> wrote:
> >
> > > Hi Alex,
> > >
> > > It sounds like your theory is plausible. After a rebalance, Streams
> needs
> > > to restore its stores from the changelog topics. Currently, Streams
> > > performs this restore operation in the same loop that does processing
> and
> > > polls the consumer for more records. If the restore batches (or the
> > > processing) take too long, Streams won’t be able to call Consumer#poll
> (on
> > > the “main” consumer)within the max.poll.interval, which causes the
> > > Consumer’s heartbeat thread to assume the instance is unhealthy and
> stop
> > > sending heartbeats, which in turn causes another rebalance.
> > >
> > > You could try either adjusting the max poll interval for the _main_
> > > consumer or decreasing the batch size for the _restore_ consumer to
> make
> > > sure Streams can call poll() frequently enough to stay in the group.
> There
> > > are prefixes you can add to the consumer configuration portions to
> target
> > > the main or restore consumer.
> > >
> > > Also worth noting, we’re planning to change this up pretty soon, so
> that
> > > restoration happens in a separate thread and doesn’t block polling like
> > > this.
> > >
> > > I hope this helps!
> > > -John
> > >
> > > On Thu, Apr 9, 2020, at 08:33, Alex Craig wrote:
> > > > Hi all, I’ve got a Kafka Streams application running in a Kubernetes
> > > > environment.  The topology on this application has 2 aggregations
> (and
> > > > therefore 2 Ktables), both of which can get fairly large – the first
> is
> > > > around 200GB and the second around 500GB.  As with any K8s platform,
> pods
> > > > can occasionally get rescheduled or go down, which of course will
> cause
> > > my
> > > > application to rebalance.  However, what I’m seeing is the
> application
> > > will
> > > > literally spend hours rebalancing, without any errors being thrown or
> > > other
> > > > obvious causes for the frequent rebalances – all I can see in the
> logs is
> > > > an instance will be restoring a state store from the changelog topic,
> > > then
> > > > suddenly it will have its partitions revoked and begin the join-group
> > > > process all over again.  (I’m running 10 pods/instances of my app,
> and I
> > > > see this same pattern in each instance)  In some cases it never
> really
> > > > recovers from this rebalancing cycle – even after 12 hours or more -
> and
> > > > I’ve had to scale down the application completely and start over by
> > > purging
> > > > the application state and re-consuming from earliest on the source
> topic.
> > > > Interestingly, after purging and starting from scratch the
> application
> > > > seems to recover from rebalances pretty easily.
> > > >
> > > > The storage I’m using is a NAS device, which admittedly is not
> > > particularly
> > > > fast.  (it’s using spinning disks and is shared amongst other
> tenants) As
> > > > an experiment, I’ve tried switching the k8s storage to an in-memory
> > > option
> > > > (this is at the k8s layer - the application is still using the same
> > > RocksDB
> > > > stores) to see if that helps.  As it turns out, I never have the
> > > rebalance
> > > > problem when using an in-memory persistence layer.  If a pod goes
> down,
> > > the
> > > > application spends around 10 - 15 minutes rebalancing and then is
> back to
> > > > processing data again.
> > > >
> > > > At this point I guess my main question is: when I’m using the NAS
> storage
> > > > and the state stores are fairly large, could I be hitting some
> timeout
> > > > somewhere that isn’t allowing the restore process to complete, which
> then
> > > > triggers another rebalance?  In other words, the restore process is
> > > simply
> > > > taking too long given the amount of data needed to restore and the
> slow
> > > > storage?   I’m currently using Kafka 2.4.1, but I saw this same
> behavior
> > > in
> > > > 2.3.  I am using a custom RocksDB config setter to limit off-heap
> memory,
> > > > but I’ve tried removing that and saw no difference in the rebalance
> > > > problem.  Again, no errors that I’m seeing or anything else in the
> logs
> > > > that seems to indicate why it can never finish rebalancing.  I’ve
> tried
> > > > turning on DEBUG logging but I’m having a tough time sifting through
> the
> > > > amount of log messages, though I’m still looking.
> > > >
> > > > If anyone has any ideas I would appreciate it, thanks!
> > > >
> > > > Alex C
> > > >
> > >
> >
>

Re: Kafka Streams endless rebalancing

Posted by John Roesler <vv...@apache.org>.
Hey Alex,

Huh.

Unprefixed configs apply to all consumers, but in this case, it's
irrelevant because only the "main" consumer participates in group
management (so the config only applies to the main consumer).

So you actually have max.poll.interval.ms set to Integer.MAX_VALUE,
which amounts to 25 days? I agree, in that case it doesn't seem like
it could be a slow batch. In fact, it couldn't be anything related to
polling, since you see rebalances sooner than 25 days.

If you have the broker logs, they'll contain the reason for the rebalance.
The only other thing I can think of that causes rebalances is failing to 
heartbeat. What do you have for session.timeout.ms and
heartbeat.interval.ms ?

If anyone else has any ideas, please jump in.

Thanks,
-John

On Fri, Apr 10, 2020, at 14:55, Alex Craig wrote:
> Thanks John, I double-checked my configs and I've actually got the
> max.poll.interval.ms set to the max (not prefixed with anything so
> presumably that’s the “main” consumer).  So I think that means the problem
> isn’t due to a single batch of messages not getting processed/committed
> within the polling cycle right?  I guess what I’m wondering is, could the
> OVERALL length of time needed to fully restore the state stores (which
> could be multiple topics with multiple partitions) be exceeding some
> timeout or threshold?  Thanks again for any ideas,
> 
> 
> 
> Alex C
> 
> 
> On Thu, Apr 9, 2020 at 9:36 AM John Roesler <vv...@apache.org> wrote:
> 
> > Hi Alex,
> >
> > It sounds like your theory is plausible. After a rebalance, Streams needs
> > to restore its stores from the changelog topics. Currently, Streams
> > performs this restore operation in the same loop that does processing and
> > polls the consumer for more records. If the restore batches (or the
> > processing) take too long, Streams won’t be able to call Consumer#poll (on
> > the “main” consumer)within the max.poll.interval, which causes the
> > Consumer’s heartbeat thread to assume the instance is unhealthy and stop
> > sending heartbeats, which in turn causes another rebalance.
> >
> > You could try either adjusting the max poll interval for the _main_
> > consumer or decreasing the batch size for the _restore_ consumer to make
> > sure Streams can call poll() frequently enough to stay in the group. There
> > are prefixes you can add to the consumer configuration portions to target
> > the main or restore consumer.
> >
> > Also worth noting, we’re planning to change this up pretty soon, so that
> > restoration happens in a separate thread and doesn’t block polling like
> > this.
> >
> > I hope this helps!
> > -John
> >
> > On Thu, Apr 9, 2020, at 08:33, Alex Craig wrote:
> > > Hi all, I’ve got a Kafka Streams application running in a Kubernetes
> > > environment.  The topology on this application has 2 aggregations (and
> > > therefore 2 Ktables), both of which can get fairly large – the first is
> > > around 200GB and the second around 500GB.  As with any K8s platform, pods
> > > can occasionally get rescheduled or go down, which of course will cause
> > my
> > > application to rebalance.  However, what I’m seeing is the application
> > will
> > > literally spend hours rebalancing, without any errors being thrown or
> > other
> > > obvious causes for the frequent rebalances – all I can see in the logs is
> > > an instance will be restoring a state store from the changelog topic,
> > then
> > > suddenly it will have its partitions revoked and begin the join-group
> > > process all over again.  (I’m running 10 pods/instances of my app, and I
> > > see this same pattern in each instance)  In some cases it never really
> > > recovers from this rebalancing cycle – even after 12 hours or more - and
> > > I’ve had to scale down the application completely and start over by
> > purging
> > > the application state and re-consuming from earliest on the source topic.
> > > Interestingly, after purging and starting from scratch the application
> > > seems to recover from rebalances pretty easily.
> > >
> > > The storage I’m using is a NAS device, which admittedly is not
> > particularly
> > > fast.  (it’s using spinning disks and is shared amongst other tenants) As
> > > an experiment, I’ve tried switching the k8s storage to an in-memory
> > option
> > > (this is at the k8s layer - the application is still using the same
> > RocksDB
> > > stores) to see if that helps.  As it turns out, I never have the
> > rebalance
> > > problem when using an in-memory persistence layer.  If a pod goes down,
> > the
> > > application spends around 10 - 15 minutes rebalancing and then is back to
> > > processing data again.
> > >
> > > At this point I guess my main question is: when I’m using the NAS storage
> > > and the state stores are fairly large, could I be hitting some timeout
> > > somewhere that isn’t allowing the restore process to complete, which then
> > > triggers another rebalance?  In other words, the restore process is
> > simply
> > > taking too long given the amount of data needed to restore and the slow
> > > storage?   I’m currently using Kafka 2.4.1, but I saw this same behavior
> > in
> > > 2.3.  I am using a custom RocksDB config setter to limit off-heap memory,
> > > but I’ve tried removing that and saw no difference in the rebalance
> > > problem.  Again, no errors that I’m seeing or anything else in the logs
> > > that seems to indicate why it can never finish rebalancing.  I’ve tried
> > > turning on DEBUG logging but I’m having a tough time sifting through the
> > > amount of log messages, though I’m still looking.
> > >
> > > If anyone has any ideas I would appreciate it, thanks!
> > >
> > > Alex C
> > >
> >
>

Re: Kafka Streams endless rebalancing

Posted by Alex Craig <al...@gmail.com>.
Thanks John, I double-checked my configs and I've actually got the
max.poll.interval.ms set to the max (not prefixed with anything so
presumably that’s the “main” consumer).  So I think that means the problem
isn’t due to a single batch of messages not getting processed/committed
within the polling cycle right?  I guess what I’m wondering is, could the
OVERALL length of time needed to fully restore the state stores (which
could be multiple topics with multiple partitions) be exceeding some
timeout or threshold?  Thanks again for any ideas,



Alex C


On Thu, Apr 9, 2020 at 9:36 AM John Roesler <vv...@apache.org> wrote:

> Hi Alex,
>
> It sounds like your theory is plausible. After a rebalance, Streams needs
> to restore its stores from the changelog topics. Currently, Streams
> performs this restore operation in the same loop that does processing and
> polls the consumer for more records. If the restore batches (or the
> processing) take too long, Streams won’t be able to call Consumer#poll (on
> the “main” consumer)within the max.poll.interval, which causes the
> Consumer’s heartbeat thread to assume the instance is unhealthy and stop
> sending heartbeats, which in turn causes another rebalance.
>
> You could try either adjusting the max poll interval for the _main_
> consumer or decreasing the batch size for the _restore_ consumer to make
> sure Streams can call poll() frequently enough to stay in the group. There
> are prefixes you can add to the consumer configuration portions to target
> the main or restore consumer.
>
> Also worth noting, we’re planning to change this up pretty soon, so that
> restoration happens in a separate thread and doesn’t block polling like
> this.
>
> I hope this helps!
> -John
>
> On Thu, Apr 9, 2020, at 08:33, Alex Craig wrote:
> > Hi all, I’ve got a Kafka Streams application running in a Kubernetes
> > environment.  The topology on this application has 2 aggregations (and
> > therefore 2 Ktables), both of which can get fairly large – the first is
> > around 200GB and the second around 500GB.  As with any K8s platform, pods
> > can occasionally get rescheduled or go down, which of course will cause
> my
> > application to rebalance.  However, what I’m seeing is the application
> will
> > literally spend hours rebalancing, without any errors being thrown or
> other
> > obvious causes for the frequent rebalances – all I can see in the logs is
> > an instance will be restoring a state store from the changelog topic,
> then
> > suddenly it will have its partitions revoked and begin the join-group
> > process all over again.  (I’m running 10 pods/instances of my app, and I
> > see this same pattern in each instance)  In some cases it never really
> > recovers from this rebalancing cycle – even after 12 hours or more - and
> > I’ve had to scale down the application completely and start over by
> purging
> > the application state and re-consuming from earliest on the source topic.
> > Interestingly, after purging and starting from scratch the application
> > seems to recover from rebalances pretty easily.
> >
> > The storage I’m using is a NAS device, which admittedly is not
> particularly
> > fast.  (it’s using spinning disks and is shared amongst other tenants) As
> > an experiment, I’ve tried switching the k8s storage to an in-memory
> option
> > (this is at the k8s layer - the application is still using the same
> RocksDB
> > stores) to see if that helps.  As it turns out, I never have the
> rebalance
> > problem when using an in-memory persistence layer.  If a pod goes down,
> the
> > application spends around 10 - 15 minutes rebalancing and then is back to
> > processing data again.
> >
> > At this point I guess my main question is: when I’m using the NAS storage
> > and the state stores are fairly large, could I be hitting some timeout
> > somewhere that isn’t allowing the restore process to complete, which then
> > triggers another rebalance?  In other words, the restore process is
> simply
> > taking too long given the amount of data needed to restore and the slow
> > storage?   I’m currently using Kafka 2.4.1, but I saw this same behavior
> in
> > 2.3.  I am using a custom RocksDB config setter to limit off-heap memory,
> > but I’ve tried removing that and saw no difference in the rebalance
> > problem.  Again, no errors that I’m seeing or anything else in the logs
> > that seems to indicate why it can never finish rebalancing.  I’ve tried
> > turning on DEBUG logging but I’m having a tough time sifting through the
> > amount of log messages, though I’m still looking.
> >
> > If anyone has any ideas I would appreciate it, thanks!
> >
> > Alex C
> >
>

Re: Kafka Streams endless rebalancing

Posted by John Roesler <vv...@apache.org>.
Hi Alex,

It sounds like your theory is plausible. After a rebalance, Streams needs to restore its stores from the changelog topics. Currently, Streams performs this restore operation in the same loop that does processing and polls the consumer for more records. If the restore batches (or the processing) take too long, Streams won’t be able to call Consumer#poll (on the “main” consumer)within the max.poll.interval, which causes the Consumer’s heartbeat thread to assume the instance is unhealthy and stop sending heartbeats, which in turn causes another rebalance. 

You could try either adjusting the max poll interval for the _main_ consumer or decreasing the batch size for the _restore_ consumer to make sure Streams can call poll() frequently enough to stay in the group. There are prefixes you can add to the consumer configuration portions to target the main or restore consumer. 

Also worth noting, we’re planning to change this up pretty soon, so that restoration happens in a separate thread and doesn’t block polling like this. 

I hope this helps!
-John

On Thu, Apr 9, 2020, at 08:33, Alex Craig wrote:
> Hi all, I’ve got a Kafka Streams application running in a Kubernetes
> environment.  The topology on this application has 2 aggregations (and
> therefore 2 Ktables), both of which can get fairly large – the first is
> around 200GB and the second around 500GB.  As with any K8s platform, pods
> can occasionally get rescheduled or go down, which of course will cause my
> application to rebalance.  However, what I’m seeing is the application will
> literally spend hours rebalancing, without any errors being thrown or other
> obvious causes for the frequent rebalances – all I can see in the logs is
> an instance will be restoring a state store from the changelog topic, then
> suddenly it will have its partitions revoked and begin the join-group
> process all over again.  (I’m running 10 pods/instances of my app, and I
> see this same pattern in each instance)  In some cases it never really
> recovers from this rebalancing cycle – even after 12 hours or more - and
> I’ve had to scale down the application completely and start over by purging
> the application state and re-consuming from earliest on the source topic.
> Interestingly, after purging and starting from scratch the application
> seems to recover from rebalances pretty easily.
> 
> The storage I’m using is a NAS device, which admittedly is not particularly
> fast.  (it’s using spinning disks and is shared amongst other tenants) As
> an experiment, I’ve tried switching the k8s storage to an in-memory option
> (this is at the k8s layer - the application is still using the same RocksDB
> stores) to see if that helps.  As it turns out, I never have the rebalance
> problem when using an in-memory persistence layer.  If a pod goes down, the
> application spends around 10 - 15 minutes rebalancing and then is back to
> processing data again.
> 
> At this point I guess my main question is: when I’m using the NAS storage
> and the state stores are fairly large, could I be hitting some timeout
> somewhere that isn’t allowing the restore process to complete, which then
> triggers another rebalance?  In other words, the restore process is simply
> taking too long given the amount of data needed to restore and the slow
> storage?   I’m currently using Kafka 2.4.1, but I saw this same behavior in
> 2.3.  I am using a custom RocksDB config setter to limit off-heap memory,
> but I’ve tried removing that and saw no difference in the rebalance
> problem.  Again, no errors that I’m seeing or anything else in the logs
> that seems to indicate why it can never finish rebalancing.  I’ve tried
> turning on DEBUG logging but I’m having a tough time sifting through the
> amount of log messages, though I’m still looking.
> 
> If anyone has any ideas I would appreciate it, thanks!
> 
> Alex C
>