You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sameer Kumar <sa...@gmail.com> on 2018/01/09 07:42:30 UTC

Kafka Streams | Impact on rocksdb stores by Rebalancing

Hi,

I would like to understand how does rebalance affect state stores
migration. If I have a cluster of 3 nodes, and 1 goes down, the partitions
for node3 gets assigned to node1 and node2, does the rocksdb on node1/node2
also starts updating its store from changelog topic.

If yes, then what impact would this migration process have on querying.

Also, if the state store restoration process takes time, how to make sure
another rebalance doesn''t happen.

-Sameer.

Re: Kafka Streams | Impact on rocksdb stores by Rebalancing

Posted by Sameer Kumar <sa...@gmail.com>.
Ok, Matthius. Thanks for correcting.

On Wed, Jan 10, 2018 at 3:18 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Sameer,
>
> the KIP you are pointing to is not related to Kafka Streams'
> task/partition assignment. Kafka Streams uses it's own implementation of
> a partitioning assigner (not the default one the consumer uses).
>
> -Matthias
>
> On 1/9/18 4:22 AM, Sameer Kumar wrote:
> > Got It. Thanks. Others can also take a look at
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 54+-+Sticky+Partition+Assignment+Strategy
> >
> > -Sameer.
> >
> > On Tue, Jan 9, 2018 at 5:33 PM, Damian Guy <da...@gmail.com> wrote:
> >
> >> Hi,
> >>
> >> yes partition assignment is aware of the standby replicas. It will try
> and
> >> assign tasks to the nodes that have the state for the task, but also
> will
> >> try and keep the assignment balanced.
> >> So the assignment will be more like your second assignment. If you are
> >> interested you can have a look at:
> >> https://github.com/apache/kafka/blob/trunk/streams/src/
> >> test/java/org/apache/kafka/streams/processor/internals/assignment/
> >> StickyTaskAssignorTest.java
> >>
> >>
> >> On Tue, 9 Jan 2018 at 11:44 Sameer Kumar <sa...@gmail.com>
> wrote:
> >>
> >>> Hi Damian,
> >>>
> >>> Thanks for your reply. I have some further ques.
> >>>
> >>> Would the partition assignment be aware of the standby replicas. What
> >> would
> >>> be the preference for task distribution: load balancing or stand by
> >>> replicas.
> >>>
> >>> For e.g
> >>>
> >>> N1
> >>> assigned partitions: 1,2
> >>> standby partitions: 5,6
> >>>
> >>> N2
> >>> assigned partitions: 3,4
> >>> standby partitions: 1,2
> >>>
> >>> N3
> >>> assigned partitions: 5,6
> >>> standby partitions: 3,4
> >>>
> >>> After N1 goes down, what would be the state of the cluster
> >>>
> >>> N2
> >>> assigned partitions: 3,4,1,2
> >>> standby partitions: 5,6
> >>>
> >>> N3
> >>> assigned partitions: 5,6
> >>> standby partitions: 3,4,1,2
> >>>
> >>> Or
> >>>
> >>> N2
> >>> assigned partitions: 3,4,1
> >>> standby partitions: 2,5,6
> >>>
> >>> N3
> >>> assigned partitions: 5,6,2
> >>> standby partitions: 1,3,4
> >>>
> >>> -Sameer.
> >>>
> >>> On Tue, Jan 9, 2018 at 2:27 PM, Damian Guy <da...@gmail.com>
> wrote:
> >>>
> >>>> On Tue, 9 Jan 2018 at 07:42 Sameer Kumar <sa...@gmail.com>
> >> wrote:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> I would like to understand how does rebalance affect state stores
> >>>>> migration. If I have a cluster of 3 nodes, and 1 goes down, the
> >>>> partitions
> >>>>> for node3 gets assigned to node1 and node2, does the rocksdb on
> >>>> node1/node2
> >>>>> also starts updating its store from changelog topic.
> >>>>>
> >>>>>
> >>>> Yes the stores will be migrated to node1 and node2 and they will be
> >>>> restored from the changelog topic
> >>>>
> >>>>
> >>>>> If yes, then what impact would this migration process have on
> >> querying.
> >>>>>
> >>>>
> >>>> You can't query the stores until they have all been restored and the
> >>>> rebalance ends.
> >>>>
> >>>>>
> >>>>> Also, if the state store restoration process takes time, how to make
> >>> sure
> >>>>> another rebalance doesn''t happen.
> >>>>>
> >>>>>
> >>>> If you don't lose any more nodes then another rebalance won't happen.
> >> If
> >>>> node1 comes back online, then there will be another rebalance, however
> >>> the
> >>>> time taken shouldn't be as long as it will already have most of the
> >> state
> >>>> locally, so it only needs to catch up with the remainder of the
> >>> changelog.
> >>>> Additionally, you should run with standby tasks. They are updated in
> >> the
> >>>> background and will mean that in the event of failure the other nodes
> >>>> should already have most of the state locally, so the restoration
> >> process
> >>>> won't take so long
> >>>>
> >>>>
> >>>>> -Sameer.
> >>>>>
> >>>>
> >>>
> >>
> >
>
>

Re: Kafka Streams | Impact on rocksdb stores by Rebalancing

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Sameer,

the KIP you are pointing to is not related to Kafka Streams'
task/partition assignment. Kafka Streams uses it's own implementation of
a partitioning assigner (not the default one the consumer uses).

-Matthias

On 1/9/18 4:22 AM, Sameer Kumar wrote:
> Got It. Thanks. Others can also take a look at
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> 
> -Sameer.
> 
> On Tue, Jan 9, 2018 at 5:33 PM, Damian Guy <da...@gmail.com> wrote:
> 
>> Hi,
>>
>> yes partition assignment is aware of the standby replicas. It will try and
>> assign tasks to the nodes that have the state for the task, but also will
>> try and keep the assignment balanced.
>> So the assignment will be more like your second assignment. If you are
>> interested you can have a look at:
>> https://github.com/apache/kafka/blob/trunk/streams/src/
>> test/java/org/apache/kafka/streams/processor/internals/assignment/
>> StickyTaskAssignorTest.java
>>
>>
>> On Tue, 9 Jan 2018 at 11:44 Sameer Kumar <sa...@gmail.com> wrote:
>>
>>> Hi Damian,
>>>
>>> Thanks for your reply. I have some further ques.
>>>
>>> Would the partition assignment be aware of the standby replicas. What
>> would
>>> be the preference for task distribution: load balancing or stand by
>>> replicas.
>>>
>>> For e.g
>>>
>>> N1
>>> assigned partitions: 1,2
>>> standby partitions: 5,6
>>>
>>> N2
>>> assigned partitions: 3,4
>>> standby partitions: 1,2
>>>
>>> N3
>>> assigned partitions: 5,6
>>> standby partitions: 3,4
>>>
>>> After N1 goes down, what would be the state of the cluster
>>>
>>> N2
>>> assigned partitions: 3,4,1,2
>>> standby partitions: 5,6
>>>
>>> N3
>>> assigned partitions: 5,6
>>> standby partitions: 3,4,1,2
>>>
>>> Or
>>>
>>> N2
>>> assigned partitions: 3,4,1
>>> standby partitions: 2,5,6
>>>
>>> N3
>>> assigned partitions: 5,6,2
>>> standby partitions: 1,3,4
>>>
>>> -Sameer.
>>>
>>> On Tue, Jan 9, 2018 at 2:27 PM, Damian Guy <da...@gmail.com> wrote:
>>>
>>>> On Tue, 9 Jan 2018 at 07:42 Sameer Kumar <sa...@gmail.com>
>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I would like to understand how does rebalance affect state stores
>>>>> migration. If I have a cluster of 3 nodes, and 1 goes down, the
>>>> partitions
>>>>> for node3 gets assigned to node1 and node2, does the rocksdb on
>>>> node1/node2
>>>>> also starts updating its store from changelog topic.
>>>>>
>>>>>
>>>> Yes the stores will be migrated to node1 and node2 and they will be
>>>> restored from the changelog topic
>>>>
>>>>
>>>>> If yes, then what impact would this migration process have on
>> querying.
>>>>>
>>>>
>>>> You can't query the stores until they have all been restored and the
>>>> rebalance ends.
>>>>
>>>>>
>>>>> Also, if the state store restoration process takes time, how to make
>>> sure
>>>>> another rebalance doesn''t happen.
>>>>>
>>>>>
>>>> If you don't lose any more nodes then another rebalance won't happen.
>> If
>>>> node1 comes back online, then there will be another rebalance, however
>>> the
>>>> time taken shouldn't be as long as it will already have most of the
>> state
>>>> locally, so it only needs to catch up with the remainder of the
>>> changelog.
>>>> Additionally, you should run with standby tasks. They are updated in
>> the
>>>> background and will mean that in the event of failure the other nodes
>>>> should already have most of the state locally, so the restoration
>> process
>>>> won't take so long
>>>>
>>>>
>>>>> -Sameer.
>>>>>
>>>>
>>>
>>
> 


Re: Kafka Streams | Impact on rocksdb stores by Rebalancing

Posted by Sameer Kumar <sa...@gmail.com>.
Got It. Thanks. Others can also take a look at
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy

-Sameer.

On Tue, Jan 9, 2018 at 5:33 PM, Damian Guy <da...@gmail.com> wrote:

> Hi,
>
> yes partition assignment is aware of the standby replicas. It will try and
> assign tasks to the nodes that have the state for the task, but also will
> try and keep the assignment balanced.
> So the assignment will be more like your second assignment. If you are
> interested you can have a look at:
> https://github.com/apache/kafka/blob/trunk/streams/src/
> test/java/org/apache/kafka/streams/processor/internals/assignment/
> StickyTaskAssignorTest.java
>
>
> On Tue, 9 Jan 2018 at 11:44 Sameer Kumar <sa...@gmail.com> wrote:
>
> > Hi Damian,
> >
> > Thanks for your reply. I have some further ques.
> >
> > Would the partition assignment be aware of the standby replicas. What
> would
> > be the preference for task distribution: load balancing or stand by
> > replicas.
> >
> > For e.g
> >
> > N1
> > assigned partitions: 1,2
> > standby partitions: 5,6
> >
> > N2
> > assigned partitions: 3,4
> > standby partitions: 1,2
> >
> > N3
> > assigned partitions: 5,6
> > standby partitions: 3,4
> >
> > After N1 goes down, what would be the state of the cluster
> >
> > N2
> > assigned partitions: 3,4,1,2
> > standby partitions: 5,6
> >
> > N3
> > assigned partitions: 5,6
> > standby partitions: 3,4,1,2
> >
> > Or
> >
> > N2
> > assigned partitions: 3,4,1
> > standby partitions: 2,5,6
> >
> > N3
> > assigned partitions: 5,6,2
> > standby partitions: 1,3,4
> >
> > -Sameer.
> >
> > On Tue, Jan 9, 2018 at 2:27 PM, Damian Guy <da...@gmail.com> wrote:
> >
> > > On Tue, 9 Jan 2018 at 07:42 Sameer Kumar <sa...@gmail.com>
> wrote:
> > >
> > > > Hi,
> > > >
> > > > I would like to understand how does rebalance affect state stores
> > > > migration. If I have a cluster of 3 nodes, and 1 goes down, the
> > > partitions
> > > > for node3 gets assigned to node1 and node2, does the rocksdb on
> > > node1/node2
> > > > also starts updating its store from changelog topic.
> > > >
> > > >
> > > Yes the stores will be migrated to node1 and node2 and they will be
> > > restored from the changelog topic
> > >
> > >
> > > > If yes, then what impact would this migration process have on
> querying.
> > > >
> > >
> > > You can't query the stores until they have all been restored and the
> > > rebalance ends.
> > >
> > > >
> > > > Also, if the state store restoration process takes time, how to make
> > sure
> > > > another rebalance doesn''t happen.
> > > >
> > > >
> > > If you don't lose any more nodes then another rebalance won't happen.
> If
> > > node1 comes back online, then there will be another rebalance, however
> > the
> > > time taken shouldn't be as long as it will already have most of the
> state
> > > locally, so it only needs to catch up with the remainder of the
> > changelog.
> > > Additionally, you should run with standby tasks. They are updated in
> the
> > > background and will mean that in the event of failure the other nodes
> > > should already have most of the state locally, so the restoration
> process
> > > won't take so long
> > >
> > >
> > > > -Sameer.
> > > >
> > >
> >
>

Re: Kafka Streams | Impact on rocksdb stores by Rebalancing

Posted by Damian Guy <da...@gmail.com>.
Hi,

yes partition assignment is aware of the standby replicas. It will try and
assign tasks to the nodes that have the state for the task, but also will
try and keep the assignment balanced.
So the assignment will be more like your second assignment. If you are
interested you can have a look at:
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java


On Tue, 9 Jan 2018 at 11:44 Sameer Kumar <sa...@gmail.com> wrote:

> Hi Damian,
>
> Thanks for your reply. I have some further ques.
>
> Would the partition assignment be aware of the standby replicas. What would
> be the preference for task distribution: load balancing or stand by
> replicas.
>
> For e.g
>
> N1
> assigned partitions: 1,2
> standby partitions: 5,6
>
> N2
> assigned partitions: 3,4
> standby partitions: 1,2
>
> N3
> assigned partitions: 5,6
> standby partitions: 3,4
>
> After N1 goes down, what would be the state of the cluster
>
> N2
> assigned partitions: 3,4,1,2
> standby partitions: 5,6
>
> N3
> assigned partitions: 5,6
> standby partitions: 3,4,1,2
>
> Or
>
> N2
> assigned partitions: 3,4,1
> standby partitions: 2,5,6
>
> N3
> assigned partitions: 5,6,2
> standby partitions: 1,3,4
>
> -Sameer.
>
> On Tue, Jan 9, 2018 at 2:27 PM, Damian Guy <da...@gmail.com> wrote:
>
> > On Tue, 9 Jan 2018 at 07:42 Sameer Kumar <sa...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > I would like to understand how does rebalance affect state stores
> > > migration. If I have a cluster of 3 nodes, and 1 goes down, the
> > partitions
> > > for node3 gets assigned to node1 and node2, does the rocksdb on
> > node1/node2
> > > also starts updating its store from changelog topic.
> > >
> > >
> > Yes the stores will be migrated to node1 and node2 and they will be
> > restored from the changelog topic
> >
> >
> > > If yes, then what impact would this migration process have on querying.
> > >
> >
> > You can't query the stores until they have all been restored and the
> > rebalance ends.
> >
> > >
> > > Also, if the state store restoration process takes time, how to make
> sure
> > > another rebalance doesn''t happen.
> > >
> > >
> > If you don't lose any more nodes then another rebalance won't happen. If
> > node1 comes back online, then there will be another rebalance, however
> the
> > time taken shouldn't be as long as it will already have most of the state
> > locally, so it only needs to catch up with the remainder of the
> changelog.
> > Additionally, you should run with standby tasks. They are updated in the
> > background and will mean that in the event of failure the other nodes
> > should already have most of the state locally, so the restoration process
> > won't take so long
> >
> >
> > > -Sameer.
> > >
> >
>

Re: Kafka Streams | Impact on rocksdb stores by Rebalancing

Posted by Sameer Kumar <sa...@gmail.com>.
Hi Damian,

Thanks for your reply. I have some further ques.

Would the partition assignment be aware of the standby replicas. What would
be the preference for task distribution: load balancing or stand by
replicas.

For e.g

N1
assigned partitions: 1,2
standby partitions: 5,6

N2
assigned partitions: 3,4
standby partitions: 1,2

N3
assigned partitions: 5,6
standby partitions: 3,4

After N1 goes down, what would be the state of the cluster

N2
assigned partitions: 3,4,1,2
standby partitions: 5,6

N3
assigned partitions: 5,6
standby partitions: 3,4,1,2

Or

N2
assigned partitions: 3,4,1
standby partitions: 2,5,6

N3
assigned partitions: 5,6,2
standby partitions: 1,3,4

-Sameer.

On Tue, Jan 9, 2018 at 2:27 PM, Damian Guy <da...@gmail.com> wrote:

> On Tue, 9 Jan 2018 at 07:42 Sameer Kumar <sa...@gmail.com> wrote:
>
> > Hi,
> >
> > I would like to understand how does rebalance affect state stores
> > migration. If I have a cluster of 3 nodes, and 1 goes down, the
> partitions
> > for node3 gets assigned to node1 and node2, does the rocksdb on
> node1/node2
> > also starts updating its store from changelog topic.
> >
> >
> Yes the stores will be migrated to node1 and node2 and they will be
> restored from the changelog topic
>
>
> > If yes, then what impact would this migration process have on querying.
> >
>
> You can't query the stores until they have all been restored and the
> rebalance ends.
>
> >
> > Also, if the state store restoration process takes time, how to make sure
> > another rebalance doesn''t happen.
> >
> >
> If you don't lose any more nodes then another rebalance won't happen. If
> node1 comes back online, then there will be another rebalance, however the
> time taken shouldn't be as long as it will already have most of the state
> locally, so it only needs to catch up with the remainder of the changelog.
> Additionally, you should run with standby tasks. They are updated in the
> background and will mean that in the event of failure the other nodes
> should already have most of the state locally, so the restoration process
> won't take so long
>
>
> > -Sameer.
> >
>

Re: Kafka Streams | Impact on rocksdb stores by Rebalancing

Posted by Damian Guy <da...@gmail.com>.
On Tue, 9 Jan 2018 at 07:42 Sameer Kumar <sa...@gmail.com> wrote:

> Hi,
>
> I would like to understand how does rebalance affect state stores
> migration. If I have a cluster of 3 nodes, and 1 goes down, the partitions
> for node3 gets assigned to node1 and node2, does the rocksdb on node1/node2
> also starts updating its store from changelog topic.
>
>
Yes the stores will be migrated to node1 and node2 and they will be
restored from the changelog topic


> If yes, then what impact would this migration process have on querying.
>

You can't query the stores until they have all been restored and the
rebalance ends.

>
> Also, if the state store restoration process takes time, how to make sure
> another rebalance doesn''t happen.
>
>
If you don't lose any more nodes then another rebalance won't happen. If
node1 comes back online, then there will be another rebalance, however the
time taken shouldn't be as long as it will already have most of the state
locally, so it only needs to catch up with the remainder of the changelog.
Additionally, you should run with standby tasks. They are updated in the
background and will mean that in the event of failure the other nodes
should already have most of the state locally, so the restoration process
won't take so long


> -Sameer.
>