You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Ramanan, Buvana (Nokia - US/Murray Hill)" <bu...@nokia-bell-labs.com> on 2017/04/06 18:08:01 UTC

Replica selection at the time of topic creation

Hello,

I tried to understand the algorithm used in choosing a leader of a topic / partition at the time of creation - rack unaware mode.

I went thru TopicCommand.scala and AdminUtils.scala, especially assignReplicasToBrokersRackUnaware() function (pasted below for convenience) and figured that the leader for a topic is chosen randomly and the first follower is chosen based on an initial random shift from the leader and the other followers  are chosen based off of the first follower by incrementing the first follower id.

Can someone please confirm that my understanding is correct?

I note that for the creation a topic with N number of partitions, the assignment algorithm will arrive at a balanced distribution of the number of partitions wrt the replicas. Whereas for the creation of N topics with 1 partition each, the resulting replica assignment may NOT result in a balanced distribution of the number of topics wrt the replicas.

The next part of my question is - are there plans to include an algorithm that does the leader & follower assignment based the prevailing load of the brokers? Suppose I add a new broker to the cluster and start creating topics, I would expect the new broker as the leader for the newly created topics. However, in my experiments this does not turn out to be the case - there is no bias towards the new broker as the leader.

Are there any ways to introduce a bias towards choosing the new broker as the leader for the creation of next several topics?

I understand that one can (a) specify the replicas list for each topic during topic creation (b) use kafka-reassign-partitions.sh to do a reassignment of partitions and (c) set auto.leader.rebalance.enable=true.

Thanks,
Buvana

private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
                                                 replicationFactor: Int,
                                                 brokerList: Seq[Int],
                                                 fixedStartIndex: Int,
                                                 startPartitionId: Int): Map[Int, Seq[Int]] = {
    val ret = mutable.Map[Int, Seq[Int]]()
    val brokerArray = brokerList.toArray
    val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
    var currentPartitionId = math.max(0, startPartitionId)
    var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
    for (_ <- 0 until nPartitions) {
      if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
        nextReplicaShift += 1
      val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
      val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
      for (j <- 0 until replicationFactor - 1)
        replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
      ret.put(currentPartitionId, replicaBuffer)
      currentPartitionId += 1
    }
    ret
  }