You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Chris Curtin (JIRA)" <ji...@apache.org> on 2013/03/04 19:21:13 UTC

[jira] [Created] (KAFKA-783) Preferred replica assignment on leader failure may not be correct

Chris Curtin created KAFKA-783:
----------------------------------

             Summary: Preferred replica assignment on leader failure may not be correct
                 Key: KAFKA-783
                 URL: https://issues.apache.org/jira/browse/KAFKA-783
             Project: Kafka
          Issue Type: Bug
          Components: replication
    Affects Versions: 0.8
         Environment: $ uname -a
Linux vrd01.atlnp1 2.6.32-279.el6.x86_64 #1 SMP Fri Jun 22 12:19:21 UTC 2012 x86_64 x86_64 x86_64 GNU/Linux

$ java -version
java version "1.6.0_25"
Java(TM) SE Runtime Environment (build 1.6.0_25-b06)
Java HotSpot(TM) 64-Bit Server VM (build 20.0-b11, mixed mode)

Kafka 0.8.0 loaded from HEAD on 1/29/2013
            Reporter: Chris Curtin
            Assignee: Neha Narkhede


Based on an email thread in the user group, Neha asked me to submit this.

Original question: "> I ran another test, again starting with a full cluster and all partitions
> had a full set of copies. When I stop the broker which was leader for 9 of
> the 10 partitions, the leaders were all elected on one machine instead of
> the set of 3. Should the leaders have been better spread out? Also the
> copies weren’t fully populated either."

Neha: "For problem 2, we always try to make the preferred replica (1st replica
in the list of all replicas for a partition) the leader, if it is
available. We intended to spread the preferred replica for all partitions
for a topic evenly across the brokers. If this is not happening, we need to
look into it. Please can you file a bug and describe your test case there ?"

Configuration:
4 node cluster
1 topic with 3 replicas
10 partitions: 0-9 below


Current status:

Partition: 0:vrd01.atlnp1 R:[  vrd03.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1]
Partition: 1:vrd01.atlnp1 R:[  vrd04.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
Partition: 2:vrd01.atlnp1 R:[  vrd01.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd02.atlnp1]
Partition: 3:vrd03.atlnp1 R:[  vrd02.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
Partition: 4:vrd01.atlnp1 R:[  vrd03.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd02.atlnp1]
Partition: 5:vrd03.atlnp1 R:[  vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
Partition: 6:vrd01.atlnp1 R:[  vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1]
Partition: 7:vrd01.atlnp1 R:[  vrd02.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
Partition: 8:vrd03.atlnp1 R:[  vrd03.atlnp1 vrd02.atlnp1 vrd04.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
Partition: 9:vrd01.atlnp1 R:[  vrd04.atlnp1 vrd03.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1]

Shutdown vrd03:

Partition: 0:vrd01.atlnp1 R:[ ] I:[]
Partition: 1:vrd01.atlnp1 R:[  vrd04.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
Partition: 2:vrd01.atlnp1 R:[ ] I:[]
*Partition: 3:vrd04.atlnp1 R:[ ] I:[]
Partition: 4:vrd01.atlnp1 R:[ ] I:[]
*Partition: 5:vrd04.atlnp1 R:[ ] I:[]
Partition: 6:vrd01.atlnp1 R:[ ] I:[]
Partition: 7:vrd01.atlnp1 R:[  vrd02.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
*Partition: 8:vrd04.atlnp1 R:[ ] I:[]
Partition: 9:vrd01.atlnp1 R:[ ] I:[]
(* means leader changed)

Note that partitions 3, 5 and 8 were assigned new leaders.

Per an email group thread with Neha, the new leader should be assigned from the preferred replica. So 3 should have gotten vrd02, 5, vrd04 and 8 vrd02 (since 03 was shutdown). Instead 3 got vrd04, 5 got vrd04 and 8 got vrd04.

Restarting vrd03 led to:

Partition: 0:vrd01.atlnp1 R:[  vrd03.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd03.atlnp1]
Partition: 1:vrd01.atlnp1 R:[  vrd04.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
Partition: 2:vrd01.atlnp1 R:[  vrd01.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd01.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
Partition: 3:vrd04.atlnp1 R:[  vrd02.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
Partition: 4:vrd01.atlnp1 R:[  vrd03.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
Partition: 5:vrd04.atlnp1 R:[  vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
Partition: 6:vrd01.atlnp1 R:[  vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd03.atlnp1]
Partition: 7:vrd01.atlnp1 R:[  vrd02.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
Partition: 8:vrd04.atlnp1 R:[  vrd03.atlnp1 vrd02.atlnp1 vrd04.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
Partition: 9:vrd01.atlnp1 R:[  vrd04.atlnp1 vrd03.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd03.atlnp1]

Stopping vrd01 now led to:

*Partition: 0:vrd04.atlnp1 R:[ ] I:[]
*Partition: 1:vrd04.atlnp1 R:[ ] I:[]
*Partition: 2:vrd02.atlnp1 R:[ ] I:[]
Partition: 3:vrd04.atlnp1 R:[  vrd02.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
*Partition: 4:vrd02.atlnp1 R:[ ] I:[]
Partition: 5:vrd04.atlnp1 R:[  vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
*Partition: 6:vrd04.atlnp1 R:[ ] I:[]
*Partition: 7:vrd04.atlnp1 R:[ ] I:[]
Partition: 8:vrd04.atlnp1 R:[  vrd03.atlnp1 vrd02.atlnp1 vrd04.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
*Partition: 9:vrd04.atlnp1 R:[ ] I:[]

(* means leader changed)

So 0, 2, 4, 6 and 7 were assigned the wrong leader (If preferred was first in the list. If last in list 1 & 2 are wrong)

Java code:

 kafka.javaapi.consumer.SimpleConsumer consumer  = new SimpleConsumer("vrd04.atlnp1",
                9092,
                100000,
                64 * 1024, "test");

        List<String> topics2 = new ArrayList<String>();
        topics2.add("storm-anon");
        TopicMetadataRequest req = new TopicMetadataRequest(topics2);
        kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

        List<kafka.javaapi.TopicMetadata> data3 =  resp.topicsMetadata();

        for (kafka.javaapi.TopicMetadata item : data3) {

           for (kafka.javaapi.PartitionMetadata part: item.partitionsMetadata() ) {
               String replicas = "";
               String isr = "";

               for (kafka.cluster.Broker replica: part.replicas() ) {
                   replicas += " " + replica.host();
               }

               for (kafka.cluster.Broker replica: part.isr() ) {
                   isr += " " + replica.host();
               }

              System.out.println( "Partition: " +   part.partitionId()  + ":" + part.leader().host() + " R:[ " + replicas + "] I:[" + isr + "]");
           }
        }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira