You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Gareth Collins <ga...@gmail.com> on 2021/04/04 19:41:42 UTC

Yet Another Repartitioning Question About Kafka Streams

Hi,

Thanks very much for answers to my previous questions here.

I had a couple more questions about repartitioning and I just want to
confirm my understanding.

(1) Given the following scenario:

(a) I have a cluster of Kafka stream nodes with partitions assigned to each.

(b) One node goes down...and it goes down for long enough that a
repartition happens (i.e. a time greater than
scheduled.rebalance.max.delay.ms passes by).

(c) Then the node finally comes back. If the state is still there can it
still be used (assuming it is assigned the same partitions)...and only the
delta read from Kafka? Or will it need to read everything again to rebuild
the state? I assume it has to re-read the state but I want to make sure.

(2) I understand warmup replicas help with minimizing downtime. If I
understand correctly, if I have at least one warmup replica configured and
if the state needed to be rebuilt from scratch in the scenario above,
switchover back to the old node will be delayed until the rebuild is
complete. Is my understanding correct? If my understanding is correct, why
would you ever set more than one warmup replica? Or should warmup replicas
usually be equal to standby replicas + 1 just in case multiple nodes are
stood up simultaneously?

(3) If I set the scheduled rebalance delay to be greater than 0 and a node
goes down, will I be able to access the state data from other replicas
while I am waiting for the rebalance?

Any answers would be greatly appreciated.

thanks,
Gareth

Re: Yet Another Repartitioning Question About Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Gareth,

There is a checkpoint file that records the corresponding offset of the
changelog for the state store data co-located within the state directory;
after the partition is migrated to new owners, this checkpoint file along
with the state store would not be deleted immediately but follow a cleanup
delay policy.

Guozhang

On Sun, Apr 11, 2021 at 11:13 AM Gareth Collins <ga...@gmail.com>
wrote:

> Hi Guozheng,
>
> Thanks very much again for the answers!
>
> One follow-up on the first question. Just so I understand it, how would it
> know where to continue from?
> I would assume that once we repartition, the new node will own the position
> in the consumer group for the relevant partition(s)
> so Kafka/Zookeeper would not know the position of the dead node anymore. Is
> the position also stored in RocksDB too somehow?
>
> thanks in advance,
> Gareth
>
>
>
>
> On Mon, Apr 5, 2021 at 6:34 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Gareth,
> >
> > 1) For this scenario, its state should be reusable and we do not need to
> > read from scratch from Kafka to rebuild.
> >
> > 2) "Warmup replicas" is just a special standby replica that is temporary,
> > note that if there's no partition migration needed at the moment, the
> > num.warmup.replicas is actually zero; the difference of
> > `max.warmup.replicas` config and the `num.standby.replicas` config is
> that,
> > the former is a global limit number, while the latter is a per task
> number.
> > I.e. if you have a total of N tasks, and you have these configs set as P
> > and Q, then during normal processing you'll have (Q+1) * N total
> replicas,
> > while during a rebalance you may have up to (Q+1) * N + P total replicas.
> > As you can see now, setting P to a larger than one value means that a
> > single rebalance run may be able to warm-up multiple partitions yet to be
> > moved with the cost of more space temporarily, while having a smaller
> > number means you may need more rounds of rebalances to achieve the end
> > rebalance goal.
> >
> > 3) Yes, if there are standby replicas, then you can still access
> > standby's states via IQ. You can read this KIP for more details:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> >
> >
> > Guozhang
> >
> > On Sun, Apr 4, 2021 at 12:41 PM Gareth Collins <
> gareth.o.collins@gmail.com
> > >
> > wrote:
> >
> > > Hi,
> > >
> > > Thanks very much for answers to my previous questions here.
> > >
> > > I had a couple more questions about repartitioning and I just want to
> > > confirm my understanding.
> > >
> > > (1) Given the following scenario:
> > >
> > > (a) I have a cluster of Kafka stream nodes with partitions assigned to
> > > each.
> > >
> > > (b) One node goes down...and it goes down for long enough that a
> > > repartition happens (i.e. a time greater than
> > > scheduled.rebalance.max.delay.ms passes by).
> > >
> > > (c) Then the node finally comes back. If the state is still there can
> it
> > > still be used (assuming it is assigned the same partitions)...and only
> > the
> > > delta read from Kafka? Or will it need to read everything again to
> > rebuild
> > > the state? I assume it has to re-read the state but I want to make
> sure.
> > >
> > > (2) I understand warmup replicas help with minimizing downtime. If I
> > > understand correctly, if I have at least one warmup replica configured
> > and
> > > if the state needed to be rebuilt from scratch in the scenario above,
> > > switchover back to the old node will be delayed until the rebuild is
> > > complete. Is my understanding correct? If my understanding is correct,
> > why
> > > would you ever set more than one warmup replica? Or should warmup
> > replicas
> > > usually be equal to standby replicas + 1 just in case multiple nodes
> are
> > > stood up simultaneously?
> > >
> > > (3) If I set the scheduled rebalance delay to be greater than 0 and a
> > node
> > > goes down, will I be able to access the state data from other replicas
> > > while I am waiting for the rebalance?
> > >
> > > Any answers would be greatly appreciated.
> > >
> > > thanks,
> > > Gareth
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Re: Yet Another Repartitioning Question About Kafka Streams

Posted by Gareth Collins <ga...@gmail.com>.
Hi Guozheng,

Thanks very much again for the answers!

One follow-up on the first question. Just so I understand it, how would it
know where to continue from?
I would assume that once we repartition, the new node will own the position
in the consumer group for the relevant partition(s)
so Kafka/Zookeeper would not know the position of the dead node anymore. Is
the position also stored in RocksDB too somehow?

thanks in advance,
Gareth




On Mon, Apr 5, 2021 at 6:34 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hello Gareth,
>
> 1) For this scenario, its state should be reusable and we do not need to
> read from scratch from Kafka to rebuild.
>
> 2) "Warmup replicas" is just a special standby replica that is temporary,
> note that if there's no partition migration needed at the moment, the
> num.warmup.replicas is actually zero; the difference of
> `max.warmup.replicas` config and the `num.standby.replicas` config is that,
> the former is a global limit number, while the latter is a per task number.
> I.e. if you have a total of N tasks, and you have these configs set as P
> and Q, then during normal processing you'll have (Q+1) * N total replicas,
> while during a rebalance you may have up to (Q+1) * N + P total replicas.
> As you can see now, setting P to a larger than one value means that a
> single rebalance run may be able to warm-up multiple partitions yet to be
> moved with the cost of more space temporarily, while having a smaller
> number means you may need more rounds of rebalances to achieve the end
> rebalance goal.
>
> 3) Yes, if there are standby replicas, then you can still access
> standby's states via IQ. You can read this KIP for more details:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
>
>
> Guozhang
>
> On Sun, Apr 4, 2021 at 12:41 PM Gareth Collins <gareth.o.collins@gmail.com
> >
> wrote:
>
> > Hi,
> >
> > Thanks very much for answers to my previous questions here.
> >
> > I had a couple more questions about repartitioning and I just want to
> > confirm my understanding.
> >
> > (1) Given the following scenario:
> >
> > (a) I have a cluster of Kafka stream nodes with partitions assigned to
> > each.
> >
> > (b) One node goes down...and it goes down for long enough that a
> > repartition happens (i.e. a time greater than
> > scheduled.rebalance.max.delay.ms passes by).
> >
> > (c) Then the node finally comes back. If the state is still there can it
> > still be used (assuming it is assigned the same partitions)...and only
> the
> > delta read from Kafka? Or will it need to read everything again to
> rebuild
> > the state? I assume it has to re-read the state but I want to make sure.
> >
> > (2) I understand warmup replicas help with minimizing downtime. If I
> > understand correctly, if I have at least one warmup replica configured
> and
> > if the state needed to be rebuilt from scratch in the scenario above,
> > switchover back to the old node will be delayed until the rebuild is
> > complete. Is my understanding correct? If my understanding is correct,
> why
> > would you ever set more than one warmup replica? Or should warmup
> replicas
> > usually be equal to standby replicas + 1 just in case multiple nodes are
> > stood up simultaneously?
> >
> > (3) If I set the scheduled rebalance delay to be greater than 0 and a
> node
> > goes down, will I be able to access the state data from other replicas
> > while I am waiting for the rebalance?
> >
> > Any answers would be greatly appreciated.
> >
> > thanks,
> > Gareth
> >
>
>
> --
> -- Guozhang
>

Re: Yet Another Repartitioning Question About Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Gareth,

1) For this scenario, its state should be reusable and we do not need to
read from scratch from Kafka to rebuild.

2) "Warmup replicas" is just a special standby replica that is temporary,
note that if there's no partition migration needed at the moment, the
num.warmup.replicas is actually zero; the difference of
`max.warmup.replicas` config and the `num.standby.replicas` config is that,
the former is a global limit number, while the latter is a per task number.
I.e. if you have a total of N tasks, and you have these configs set as P
and Q, then during normal processing you'll have (Q+1) * N total replicas,
while during a rebalance you may have up to (Q+1) * N + P total replicas.
As you can see now, setting P to a larger than one value means that a
single rebalance run may be able to warm-up multiple partitions yet to be
moved with the cost of more space temporarily, while having a smaller
number means you may need more rounds of rebalances to achieve the end
rebalance goal.

3) Yes, if there are standby replicas, then you can still access
standby's states via IQ. You can read this KIP for more details:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance


Guozhang

On Sun, Apr 4, 2021 at 12:41 PM Gareth Collins <ga...@gmail.com>
wrote:

> Hi,
>
> Thanks very much for answers to my previous questions here.
>
> I had a couple more questions about repartitioning and I just want to
> confirm my understanding.
>
> (1) Given the following scenario:
>
> (a) I have a cluster of Kafka stream nodes with partitions assigned to
> each.
>
> (b) One node goes down...and it goes down for long enough that a
> repartition happens (i.e. a time greater than
> scheduled.rebalance.max.delay.ms passes by).
>
> (c) Then the node finally comes back. If the state is still there can it
> still be used (assuming it is assigned the same partitions)...and only the
> delta read from Kafka? Or will it need to read everything again to rebuild
> the state? I assume it has to re-read the state but I want to make sure.
>
> (2) I understand warmup replicas help with minimizing downtime. If I
> understand correctly, if I have at least one warmup replica configured and
> if the state needed to be rebuilt from scratch in the scenario above,
> switchover back to the old node will be delayed until the rebuild is
> complete. Is my understanding correct? If my understanding is correct, why
> would you ever set more than one warmup replica? Or should warmup replicas
> usually be equal to standby replicas + 1 just in case multiple nodes are
> stood up simultaneously?
>
> (3) If I set the scheduled rebalance delay to be greater than 0 and a node
> goes down, will I be able to access the state data from other replicas
> while I am waiting for the rebalance?
>
> Any answers would be greatly appreciated.
>
> thanks,
> Gareth
>


-- 
-- Guozhang