You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Shrikant Patel <SP...@pdxinc.com> on 2017/03/02 21:20:53 UTC

Kafka partition no migrating to another broker.

I have 5 broker kafka cluster. Replication factor = 3, Number of partition = 12, Min Insync repica (ISR) = 3

First output is when all server are up and running. Second output is when I bring down server id = 4. Another server from ISR take server 4's place as leader of partition, so that's good. I was expecting another broker will join ISR in place for server 4. Why doesn't that happen? Since min ISR is 3, I cannot publish to certain partitions of the topic.

When server 4 comes back up, it starts to work fine.

./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic xxx
Topic:xxx   PartitionCount:12       ReplicationFactor:3     Configs:
        Topic: xxx  Partition: 0    Leader: 5       Replicas: 5,2,3 Isr: 3,2,5
        Topic: xxx  Partition: 1    Leader: 1       Replicas: 1,3,4 Isr: 3,4,1
        Topic: xxx  Partition: 2    Leader: 2       Replicas: 2,4,5 Isr: 2,5,4
        Topic: xxx  Partition: 3    Leader: 3       Replicas: 3,5,1 Isr: 5,3,1
        Topic: xxx  Partition: 4    Leader: 4       Replicas: 4,1,2 Isr: 2,4,1
        Topic: xxx  Partition: 5    Leader: 5       Replicas: 5,3,4 Isr: 3,5,4
        Topic: xxx  Partition: 6    Leader: 1       Replicas: 1,4,5 Isr: 5,4,1
        Topic: xxx  Partition: 7    Leader: 2       Replicas: 2,5,1 Isr: 5,2,1
        Topic: xxx  Partition: 8    Leader: 3       Replicas: 3,1,2 Isr: 3,2,1
        Topic: xxx  Partition: 9    Leader: 4       Replicas: 4,2,3 Isr: 3,2,4
        Topic: xxx  Partition: 10   Leader: 5       Replicas: 5,4,1 Isr: 5,4,1
        Topic: xxx  Partition: 11   Leader: 1       Replicas: 1,5,2 Isr: 5,2,1


./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic xxx
Topic:xxx   PartitionCount:12       ReplicationFactor:3     Configs:
        Topic: xxx  Partition: 0    Leader: 5       Replicas: 5,2,3 Isr: 3,2,5
        Topic: xxx  Partition: 1    Leader: 1       Replicas: 1,3,4 Isr: 3,1
        Topic: xxx  Partition: 2    Leader: 2       Replicas: 2,4,5 Isr: 2,5
        Topic: xxx  Partition: 3    Leader: 3       Replicas: 3,5,1 Isr: 5,3,1
        Topic: xxx  Partition: 4    Leader: 1       Replicas: 4,1,2 Isr: 2,1
        Topic: xxx  Partition: 5    Leader: 5       Replicas: 5,3,4 Isr: 3,5
        Topic: xxx  Partition: 6    Leader: 1       Replicas: 1,4,5 Isr: 5,1
        Topic: xxx  Partition: 7    Leader: 2       Replicas: 2,5,1 Isr: 5,2,1
        Topic: xxx  Partition: 8    Leader: 3       Replicas: 3,1,2 Isr: 3,2,1
        Topic: xxx  Partition: 9    Leader: 2       Replicas: 4,2,3 Isr: 2,3
        Topic: xxx  Partition: 10   Leader: 5       Replicas: 5,4,1 Isr: 5,1
        Topic: xxx  Partition: 11   Leader: 1       Replicas: 1,5,2 Isr: 5,2,1

./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic xxx
Topic:xxx   PartitionCount:12       ReplicationFactor:3     Configs:
        Topic: xxx  Partition: 0    Leader: 5       Replicas: 5,2,3 Isr: 3,2,5
        Topic: xxx  Partition: 1    Leader: 1       Replicas: 1,3,4 Isr: 3,1,4
        Topic: xxx  Partition: 2    Leader: 2       Replicas: 2,4,5 Isr: 2,5,4
        Topic: xxx  Partition: 3    Leader: 3       Replicas: 3,5,1 Isr: 5,3,1
        Topic: xxx  Partition: 4    Leader: 4       Replicas: 4,1,2 Isr: 2,1,4
        Topic: xxx  Partition: 5    Leader: 5       Replicas: 5,3,4 Isr: 3,5,4
        Topic: xxx  Partition: 6    Leader: 1       Replicas: 1,4,5 Isr: 5,1,4
        Topic: xxx  Partition: 7    Leader: 2       Replicas: 2,5,1 Isr: 5,2,1
        Topic: xxx  Partition: 8    Leader: 3       Replicas: 3,1,2 Isr: 3,2,1
        Topic: xxx  Partition: 9    Leader: 4       Replicas: 4,2,3 Isr: 2,3,4
        Topic: xxx  Partition: 10   Leader: 5       Replicas: 5,4,1 Isr: 5,1,4
        Topic: xxx  Partition: 11   Leader: 1       Replicas: 1,5,2 Isr: 5,2,1



Thanks,
Shri

This e-mail and its contents (to include attachments) are the property of National Health Systems, Inc., its subsidiaries and affiliates, including but not limited to Rx.com Community Healthcare Network, Inc. and its subsidiaries, and may contain confidential and proprietary or privileged information. If you are not the intended recipient of this e-mail, you are hereby notified that any unauthorized disclosure, copying, or distribution of this e-mail or of its attachments, or the taking of any unauthorized action based on information contained herein is strictly prohibited. Unauthorized use of information contained herein may subject you to civil and criminal prosecution and penalties. If you are not the intended recipient, please immediately notify the sender by telephone at 800-433-5719 or return e-mail and permanently delete the original e-mail.

RE: Kafka partition no migrating to another broker.

Posted by Shrikant Patel <SP...@pdxinc.com>.
Stevo

Thanks for clarification.

Thanks,
Shri


-----Original Message-----
From: Stevo Slavić [mailto:sslavic@gmail.com]
Sent: Friday, March 03, 2017 1:55 AM
To: users@kafka.apache.org
Subject: Re: Kafka partition no migrating to another broker.

Hello Shri,

That behavior is by current Apache Kafka design. At topic creation time, for every topic partition, replication factor is converted to a replica set (set of ids of brokers which should replicate the partition), and those per partition replica sets is the metadata the gets stored in ZooKeepeer, replication factor does not get stored. Replication factor of a topic partition can be calculated from replica set size. There is no active component in Apache Kafka that would be actively moving/distributing partition replicas around available brokers as brokers are added/removed to/from the cluster or when they crash.

In scenario that you simulated, when a broker crashes, one should be monitoring availability of brokers in the cluster, and metrics like ISR shrinks and under replicated partitions, and then recover the broker that crashed by either fixing or bringing new one in its place (keeping same broker id as old one). If old one is fixed it will catch up from other replicas, or if new one is put in its place, since it has same id as old one, it will replicate partition(s) completely from other brokers which are leaders for those partitions. In both cases there is no need to change replica set of any partition where that broker was member of replica set.
Very similar to crash like scenario is handling of another typical failure scenario - when there is a network partition so broker is not accessible by cluster controller broker or it cannot reach out to ZooKeeper ensemble. As soon as network issue is resolved, broker should start catching up.

If there was component doing automatic reassignment it would have to make some tough decisions with hard to predict consequences. E.g. if broker became inaccessible to a component and such component would decide that it should change replica set of all affected partitions that the broker was replica for, brokers that took over and became new replicas they would start replicating a fresh partitions they just became replicas for, and that could cause huge load on the cluster members, disk and network. And that decision could be wrong, extra load it caused unnecessary, if broker that was thought of being crashed for long, became accessible or came back up (automatically or was brought back up manually) shortly after reassignment happened.

If you do planned shrinking of the Kafka cluster, or you're adding new brokers too it, there's a command line tool you can use to change replica assignment for existing partitions to balance the partitions across the cluster, while newly created topics will automatically take it into account which brokers are available in the cluster. Balancing the partitions is typically not same thing as balancing the load across the cluster since load is typically not even across different topics and it can be the case too for load on partitions of a given topic.

Replication and other Apache Kafka design and implementation topics are well covered in the documentation (see http://kafka.apache.org/documentation/#replication ) Some extra auto balancing features are available in Confluent Enterprise platform (e.g see http://docs.confluent.io/3.2.0/cp-docker-images/docs/tutorials/automatic-data-balancing.html
)

Kind regards,
Stevo Slavic.


On Thu, Mar 2, 2017 at 10:20 PM, Shrikant Patel <SP...@pdxinc.com> wrote:

> I have 5 broker kafka cluster. Replication factor = 3, Number of
> partition = 12, Min Insync repica (ISR) = 3
>
> First output is when all server are up and running. Second output is
> when I bring down server id = 4. Another server from ISR take server
> 4's place as leader of partition, so that's good. I was expecting
> another broker will join ISR in place for server 4. Why doesn't that
> happen? Since min ISR is 3, I cannot publish to certain partitions of the topic.
>
> When server 4 comes back up, it starts to work fine.
>
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic xxx
> Topic:xxx   PartitionCount:12       ReplicationFactor:3     Configs:
>         Topic: xxx  Partition: 0    Leader: 5       Replicas: 5,2,3 Isr:
> 3,2,5
>         Topic: xxx  Partition: 1    Leader: 1       Replicas: 1,3,4 Isr:
> 3,4,1
>         Topic: xxx  Partition: 2    Leader: 2       Replicas: 2,4,5 Isr:
> 2,5,4
>         Topic: xxx  Partition: 3    Leader: 3       Replicas: 3,5,1 Isr:
> 5,3,1
>         Topic: xxx  Partition: 4    Leader: 4       Replicas: 4,1,2 Isr:
> 2,4,1
>         Topic: xxx  Partition: 5    Leader: 5       Replicas: 5,3,4 Isr:
> 3,5,4
>         Topic: xxx  Partition: 6    Leader: 1       Replicas: 1,4,5 Isr:
> 5,4,1
>         Topic: xxx  Partition: 7    Leader: 2       Replicas: 2,5,1 Isr:
> 5,2,1
>         Topic: xxx  Partition: 8    Leader: 3       Replicas: 3,1,2 Isr:
> 3,2,1
>         Topic: xxx  Partition: 9    Leader: 4       Replicas: 4,2,3 Isr:
> 3,2,4
>         Topic: xxx  Partition: 10   Leader: 5       Replicas: 5,4,1 Isr:
> 5,4,1
>         Topic: xxx  Partition: 11   Leader: 1       Replicas: 1,5,2 Isr:
> 5,2,1
>
>
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic xxx
> Topic:xxx   PartitionCount:12       ReplicationFactor:3     Configs:
>         Topic: xxx  Partition: 0    Leader: 5       Replicas: 5,2,3 Isr:
> 3,2,5
>         Topic: xxx  Partition: 1    Leader: 1       Replicas: 1,3,4 Isr:
> 3,1
>         Topic: xxx  Partition: 2    Leader: 2       Replicas: 2,4,5 Isr:
> 2,5
>         Topic: xxx  Partition: 3    Leader: 3       Replicas: 3,5,1 Isr:
> 5,3,1
>         Topic: xxx  Partition: 4    Leader: 1       Replicas: 4,1,2 Isr:
> 2,1
>         Topic: xxx  Partition: 5    Leader: 5       Replicas: 5,3,4 Isr:
> 3,5
>         Topic: xxx  Partition: 6    Leader: 1       Replicas: 1,4,5 Isr:
> 5,1
>         Topic: xxx  Partition: 7    Leader: 2       Replicas: 2,5,1 Isr:
> 5,2,1
>         Topic: xxx  Partition: 8    Leader: 3       Replicas: 3,1,2 Isr:
> 3,2,1
>         Topic: xxx  Partition: 9    Leader: 2       Replicas: 4,2,3 Isr:
> 2,3
>         Topic: xxx  Partition: 10   Leader: 5       Replicas: 5,4,1 Isr:
> 5,1
>         Topic: xxx  Partition: 11   Leader: 1       Replicas: 1,5,2 Isr:
> 5,2,1
>
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic xxx
> Topic:xxx   PartitionCount:12       ReplicationFactor:3     Configs:
>         Topic: xxx  Partition: 0    Leader: 5       Replicas: 5,2,3 Isr:
> 3,2,5
>         Topic: xxx  Partition: 1    Leader: 1       Replicas: 1,3,4 Isr:
> 3,1,4
>         Topic: xxx  Partition: 2    Leader: 2       Replicas: 2,4,5 Isr:
> 2,5,4
>         Topic: xxx  Partition: 3    Leader: 3       Replicas: 3,5,1 Isr:
> 5,3,1
>         Topic: xxx  Partition: 4    Leader: 4       Replicas: 4,1,2 Isr:
> 2,1,4
>         Topic: xxx  Partition: 5    Leader: 5       Replicas: 5,3,4 Isr:
> 3,5,4
>         Topic: xxx  Partition: 6    Leader: 1       Replicas: 1,4,5 Isr:
> 5,1,4
>         Topic: xxx  Partition: 7    Leader: 2       Replicas: 2,5,1 Isr:
> 5,2,1
>         Topic: xxx  Partition: 8    Leader: 3       Replicas: 3,1,2 Isr:
> 3,2,1
>         Topic: xxx  Partition: 9    Leader: 4       Replicas: 4,2,3 Isr:
> 2,3,4
>         Topic: xxx  Partition: 10   Leader: 5       Replicas: 5,4,1 Isr:
> 5,1,4
>         Topic: xxx  Partition: 11   Leader: 1       Replicas: 1,5,2 Isr:
> 5,2,1
>
>
>
> Thanks,
> Shri
>
> This e-mail and its contents (to include attachments) are the property
> of National Health Systems, Inc., its subsidiaries and affiliates,
> including but not limited to Rx.com Community Healthcare Network, Inc.
> and its subsidiaries, and may contain confidential and proprietary or
> privileged information. If you are not the intended recipient of this
> e-mail, you are hereby notified that any unauthorized disclosure,
> copying, or distribution of this e-mail or of its attachments, or the
> taking of any unauthorized action based on information contained herein is strictly prohibited.
> Unauthorized use of information contained herein may subject you to
> civil and criminal prosecution and penalties. If you are not the
> intended recipient, please immediately notify the sender by telephone
> at
> 800-433-5719 or return e-mail and permanently delete the original e-mail.
>
This e-mail and its contents (to include attachments) are the property of National Health Systems, Inc., its subsidiaries and affiliates, including but not limited to Rx.com Community Healthcare Network, Inc. and its subsidiaries, and may contain confidential and proprietary or privileged information. If you are not the intended recipient of this e-mail, you are hereby notified that any unauthorized disclosure, copying, or distribution of this e-mail or of its attachments, or the taking of any unauthorized action based on information contained herein is strictly prohibited. Unauthorized use of information contained herein may subject you to civil and criminal prosecution and penalties. If you are not the intended recipient, please immediately notify the sender by telephone at 800-433-5719 or return e-mail and permanently delete the original e-mail.

Re: Kafka partition no migrating to another broker.

Posted by Stevo Slavić <ss...@gmail.com>.
Hello Shri,

That behavior is by current Apache Kafka design. At topic creation time,
for every topic partition, replication factor is converted to a replica set
(set of ids of brokers which should replicate the partition), and those per
partition replica sets is the metadata the gets stored in ZooKeepeer,
replication factor does not get stored. Replication factor of a topic
partition can be calculated from replica set size. There is no active
component in Apache Kafka that would be actively moving/distributing
partition replicas around available brokers as brokers are added/removed
to/from the cluster or when they crash.

In scenario that you simulated, when a broker crashes, one should be
monitoring availability of brokers in the cluster, and metrics like ISR
shrinks and under replicated partitions, and then recover the broker that
crashed by either fixing or bringing new one in its place (keeping same
broker id as old one). If old one is fixed it will catch up from other
replicas, or if new one is put in its place, since it has same id as old
one, it will replicate partition(s) completely from other brokers which are
leaders for those partitions. In both cases there is no need to change
replica set of any partition where that broker was member of replica set.
Very similar to crash like scenario is handling of another typical failure
scenario - when there is a network partition so broker is not accessible by
cluster controller broker or it cannot reach out to ZooKeeper ensemble. As
soon as network issue is resolved, broker should start catching up.

If there was component doing automatic reassignment it would have to make
some tough decisions with hard to predict consequences. E.g. if broker
became inaccessible to a component and such component would decide that it
should change replica set of all affected partitions that the broker was
replica for, brokers that took over and became new replicas they would
start replicating a fresh partitions they just became replicas for, and
that could cause huge load on the cluster members, disk and network. And
that decision could be wrong, extra load it caused unnecessary, if broker
that was thought of being crashed for long, became accessible or came back
up (automatically or was brought back up manually) shortly after
reassignment happened.

If you do planned shrinking of the Kafka cluster, or you're adding new
brokers too it, there's a command line tool you can use to change replica
assignment for existing partitions to balance the partitions across the
cluster, while newly created topics will automatically take it into account
which brokers are available in the cluster. Balancing the partitions is
typically not same thing as balancing the load across the cluster since
load is typically not even across different topics and it can be the case
too for load on partitions of a given topic.

Replication and other Apache Kafka design and implementation topics are
well covered in the documentation (see
http://kafka.apache.org/documentation/#replication )
Some extra auto balancing features are available in Confluent Enterprise
platform (e.g see
http://docs.confluent.io/3.2.0/cp-docker-images/docs/tutorials/automatic-data-balancing.html
)

Kind regards,
Stevo Slavic.


On Thu, Mar 2, 2017 at 10:20 PM, Shrikant Patel <SP...@pdxinc.com> wrote:

> I have 5 broker kafka cluster. Replication factor = 3, Number of partition
> = 12, Min Insync repica (ISR) = 3
>
> First output is when all server are up and running. Second output is when
> I bring down server id = 4. Another server from ISR take server 4's place
> as leader of partition, so that's good. I was expecting another broker will
> join ISR in place for server 4. Why doesn't that happen? Since min ISR is
> 3, I cannot publish to certain partitions of the topic.
>
> When server 4 comes back up, it starts to work fine.
>
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic xxx
> Topic:xxx   PartitionCount:12       ReplicationFactor:3     Configs:
>         Topic: xxx  Partition: 0    Leader: 5       Replicas: 5,2,3 Isr:
> 3,2,5
>         Topic: xxx  Partition: 1    Leader: 1       Replicas: 1,3,4 Isr:
> 3,4,1
>         Topic: xxx  Partition: 2    Leader: 2       Replicas: 2,4,5 Isr:
> 2,5,4
>         Topic: xxx  Partition: 3    Leader: 3       Replicas: 3,5,1 Isr:
> 5,3,1
>         Topic: xxx  Partition: 4    Leader: 4       Replicas: 4,1,2 Isr:
> 2,4,1
>         Topic: xxx  Partition: 5    Leader: 5       Replicas: 5,3,4 Isr:
> 3,5,4
>         Topic: xxx  Partition: 6    Leader: 1       Replicas: 1,4,5 Isr:
> 5,4,1
>         Topic: xxx  Partition: 7    Leader: 2       Replicas: 2,5,1 Isr:
> 5,2,1
>         Topic: xxx  Partition: 8    Leader: 3       Replicas: 3,1,2 Isr:
> 3,2,1
>         Topic: xxx  Partition: 9    Leader: 4       Replicas: 4,2,3 Isr:
> 3,2,4
>         Topic: xxx  Partition: 10   Leader: 5       Replicas: 5,4,1 Isr:
> 5,4,1
>         Topic: xxx  Partition: 11   Leader: 1       Replicas: 1,5,2 Isr:
> 5,2,1
>
>
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic xxx
> Topic:xxx   PartitionCount:12       ReplicationFactor:3     Configs:
>         Topic: xxx  Partition: 0    Leader: 5       Replicas: 5,2,3 Isr:
> 3,2,5
>         Topic: xxx  Partition: 1    Leader: 1       Replicas: 1,3,4 Isr:
> 3,1
>         Topic: xxx  Partition: 2    Leader: 2       Replicas: 2,4,5 Isr:
> 2,5
>         Topic: xxx  Partition: 3    Leader: 3       Replicas: 3,5,1 Isr:
> 5,3,1
>         Topic: xxx  Partition: 4    Leader: 1       Replicas: 4,1,2 Isr:
> 2,1
>         Topic: xxx  Partition: 5    Leader: 5       Replicas: 5,3,4 Isr:
> 3,5
>         Topic: xxx  Partition: 6    Leader: 1       Replicas: 1,4,5 Isr:
> 5,1
>         Topic: xxx  Partition: 7    Leader: 2       Replicas: 2,5,1 Isr:
> 5,2,1
>         Topic: xxx  Partition: 8    Leader: 3       Replicas: 3,1,2 Isr:
> 3,2,1
>         Topic: xxx  Partition: 9    Leader: 2       Replicas: 4,2,3 Isr:
> 2,3
>         Topic: xxx  Partition: 10   Leader: 5       Replicas: 5,4,1 Isr:
> 5,1
>         Topic: xxx  Partition: 11   Leader: 1       Replicas: 1,5,2 Isr:
> 5,2,1
>
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic xxx
> Topic:xxx   PartitionCount:12       ReplicationFactor:3     Configs:
>         Topic: xxx  Partition: 0    Leader: 5       Replicas: 5,2,3 Isr:
> 3,2,5
>         Topic: xxx  Partition: 1    Leader: 1       Replicas: 1,3,4 Isr:
> 3,1,4
>         Topic: xxx  Partition: 2    Leader: 2       Replicas: 2,4,5 Isr:
> 2,5,4
>         Topic: xxx  Partition: 3    Leader: 3       Replicas: 3,5,1 Isr:
> 5,3,1
>         Topic: xxx  Partition: 4    Leader: 4       Replicas: 4,1,2 Isr:
> 2,1,4
>         Topic: xxx  Partition: 5    Leader: 5       Replicas: 5,3,4 Isr:
> 3,5,4
>         Topic: xxx  Partition: 6    Leader: 1       Replicas: 1,4,5 Isr:
> 5,1,4
>         Topic: xxx  Partition: 7    Leader: 2       Replicas: 2,5,1 Isr:
> 5,2,1
>         Topic: xxx  Partition: 8    Leader: 3       Replicas: 3,1,2 Isr:
> 3,2,1
>         Topic: xxx  Partition: 9    Leader: 4       Replicas: 4,2,3 Isr:
> 2,3,4
>         Topic: xxx  Partition: 10   Leader: 5       Replicas: 5,4,1 Isr:
> 5,1,4
>         Topic: xxx  Partition: 11   Leader: 1       Replicas: 1,5,2 Isr:
> 5,2,1
>
>
>
> Thanks,
> Shri
>
> This e-mail and its contents (to include attachments) are the property of
> National Health Systems, Inc., its subsidiaries and affiliates, including
> but not limited to Rx.com Community Healthcare Network, Inc. and its
> subsidiaries, and may contain confidential and proprietary or privileged
> information. If you are not the intended recipient of this e-mail, you are
> hereby notified that any unauthorized disclosure, copying, or distribution
> of this e-mail or of its attachments, or the taking of any unauthorized
> action based on information contained herein is strictly prohibited.
> Unauthorized use of information contained herein may subject you to civil
> and criminal prosecution and penalties. If you are not the intended
> recipient, please immediately notify the sender by telephone at
> 800-433-5719 or return e-mail and permanently delete the original e-mail.
>