You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Allen Wang <aw...@netflix.com.INVALID> on 2015/03/17 18:34:34 UTC

Broker is not aware of new partitions assigned

Hello,

I developed a tool to add partitions and assign new partitions to a set of
brokers in one operation by utilizing the API
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK().

It worked well in most cases. However, in one case, I found that the
brokers are not aware of new partitions assigned to them, even though the
zookeeper data clearly shows the assignment.

Here is the zookeeper data for the partition:

{"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]}

On broker 62, the error message is:

2015-03-17 17:11:57,157 WARN  kafka.utils.Logging$class:83
[kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with
correlation id 2048464 from client x on partition [m,71] failed due to
Partition [m,71] doesn't exist on 62

Here is the core function of the tool:

  def addPartitionsToTopic(zkClient: ZkClient, topic: String,
brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute: Boolean):
Unit = {
    val existingPartitionsReplicaList =
ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
    val config = AdminUtils.fetchTopicConfig(zkClient, topic)
    printf("Topic config: %s\n\n", config)
    if (existingPartitionsReplicaList.size == 0)
      throw new AdminOperationException("The topic %s does not
exist".format(topic))
    val currentPartitions = existingPartitionsReplicaList.size
    val replicationFactor = existingPartitionsReplicaList.map(e =>
e._2.size).max
    val brokersWithPartitions = existingPartitionsReplicaList.flatMap(e =>
e._2).toSet.toSeq
    if (brokersToAssignPartitions.intersect(brokersWithPartitions).size >
0) {
      printf("Topic %s already has partitions on brokers %s. Skipping.\n",
topic, brokersToAssignPartitions)
      return
    }
    val totalBrokers = brokers.size
    val oldBrokers = totalBrokers - brokersToAssignPartitions.size
    if (oldBrokers == 0) {
      throw new IllegalArgumentException("Cannot add partitions to new
brokers without existing partitions")
    }
    val expectedPartitions = currentPartitions * totalBrokers / oldBrokers
    val newPartitions = expectedPartitions - currentPartitions
    if (newPartitions <= 0) {
      throw new IllegalArgumentException("Invalid number of new partitions
%d".format(newPartitions))
    }
    val newPartitionReplicaList =
AdminUtils.assignReplicasToBrokers(brokersToAssignPartitions,
newPartitions, replicationFactor, startPartitionId = currentPartitions)
    val partitionReplicaList = existingPartitionsReplicaList.map(p =>
p._1.partition -> p._2)
    // add the new list
    partitionReplicaList ++= newPartitionReplicaList
    printf("Changing number of partitions from %d to %d to topic %s\n\n",
currentPartitions, expectedPartitions, topic)
    printf("Replica reassignment for new partitions:\n\n%s\n\n",
getAssignmentJson(topic, newPartitionReplicaList))
    printf("Complete replica assignment:\n\n%s\n\n",
getAssignmentJson(topic, partitionReplicaList))
    if (execute) {
      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient,
topic, partitionReplicaList, config, update = true)
      println("New partitions are added")
    } else {
      println("No update is executed in dry run mode")
    }
  }

It seems to me that the new assignment in ZooKeeper data does not propagate
to some of the new brokers. However, looking at TopicCommand, it uses the
same AdminUtils function to add new partitions.

Am I missing anything or this is a bug in the broker?

Thanks,
Allen

Re: Broker is not aware of new partitions assigned

Posted by Allen Wang <aw...@netflix.com.INVALID>.
I am not sure if Kafka version difference is the issue as later adding
partitions for other topics works. Is there any compatibility issues on
protocol level in 0.8.2.1 vs. 0.8.1.1?

Also restarting the controller seems to fix the problem.

On Tue, Mar 17, 2015 at 4:08 PM, Mayuresh Gharat <gharatmayuresh15@gmail.com
> wrote:

> Probably you can try restarting the controller and have same version for
> the controller and the brokers.
> BTW, was there any specific reason you are running 2 different versions for
> the controller and other brokers?
>
> Thanks,
>
> Mayuresh
>
> On Tue, Mar 17, 2015 at 4:02 PM, Allen Wang <aw...@netflix.com.invalid>
> wrote:
>
> > Yes, the watcher is still alive. The log in the controller indicates that
> > it observed the changes.
> >
> >
> > On Tue, Mar 17, 2015 at 2:05 PM, Mayuresh Gharat <
> > gharatmayuresh15@gmail.com
> > > wrote:
> >
> > > I think the way reassignment works is asynchronous. Changes are made to
> > > zookeeper but those changes get reflected only when controller watcher
> > > fires for the respective zookeeper path. Is your watcher still alive?
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Tue, Mar 17, 2015 at 1:29 PM, Allen Wang <awang@netflix.com.invalid
> >
> > > wrote:
> > >
> > > > Looking a bit more into controller log, it seems that when the
> > partition
> > > > assignment is changed in ZooKeeper, the controller has quite a lot
> > > > exceptions communicating with new brokers where the partitions are
> > > > assigned. One thing to note is that the new brokers have Kafka
> version
> > > > 0.8.2.1 and the controller has Kafka version 0.8.1.1.
> > > >
> > > > 2015-03-16 22:36:58,178 WARN  kafka.utils.Logging$class:89
> > > > [Controller-2-to-broker-48-send-thread] [warn]
> > > > [Controller-2-to-broker-48-send-thread], Controller 2 fails to send a
> > > > request to broker id:48,host:xyz:7101
> > > > java.io.EOFException: Received -1 when reading from channel, socket
> has
> > > > likely been closed.
> > > >         at kafka.utils.Utils$.read(Utils.scala:376)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > >         at
> > > > kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > >         at
> > > kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
> > > >         at
> > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > >
> > > > Does it explain why the brokers are not aware of the new assignments?
> > Is
> > > > there anyway to recover from this communication problem, like
> > restarting
> > > > the controller?
> > > >
> > > > Thanks,
> > > > Allen
> > > >
> > > >
> > > > On Tue, Mar 17, 2015 at 10:34 AM, Allen Wang <aw...@netflix.com>
> > wrote:
> > > >
> > > > > Hello,
> > > > >
> > > > > I developed a tool to add partitions and assign new partitions to a
> > set
> > > > of
> > > > > brokers in one operation by utilizing the API
> > > > > AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK().
> > > > >
> > > > > It worked well in most cases. However, in one case, I found that
> the
> > > > > brokers are not aware of new partitions assigned to them, even
> though
> > > the
> > > > > zookeeper data clearly shows the assignment.
> > > > >
> > > > > Here is the zookeeper data for the partition:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> {"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]}
> > > > >
> > > > > On broker 62, the error message is:
> > > > >
> > > > > 2015-03-17 17:11:57,157 WARN  kafka.utils.Logging$class:83
> > > > > [kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with
> > > > > correlation id 2048464 from client x on partition [m,71] failed due
> > to
> > > > > Partition [m,71] doesn't exist on 62
> > > > >
> > > > > Here is the core function of the tool:
> > > > >
> > > > >   def addPartitionsToTopic(zkClient: ZkClient, topic: String,
> > > > > brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute:
> > > > Boolean):
> > > > > Unit = {
> > > > >     val existingPartitionsReplicaList =
> > > > > ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
> > > > >     val config = AdminUtils.fetchTopicConfig(zkClient, topic)
> > > > >     printf("Topic config: %s\n\n", config)
> > > > >     if (existingPartitionsReplicaList.size == 0)
> > > > >       throw new AdminOperationException("The topic %s does not
> > > > > exist".format(topic))
> > > > >     val currentPartitions = existingPartitionsReplicaList.size
> > > > >     val replicationFactor = existingPartitionsReplicaList.map(e =>
> > > > > e._2.size).max
> > > > >     val brokersWithPartitions =
> > existingPartitionsReplicaList.flatMap(e
> > > > =>
> > > > > e._2).toSet.toSeq
> > > > >     if
> > > (brokersToAssignPartitions.intersect(brokersWithPartitions).size >
> > > > > 0) {
> > > > >       printf("Topic %s already has partitions on brokers %s.
> > > > Skipping.\n",
> > > > > topic, brokersToAssignPartitions)
> > > > >       return
> > > > >     }
> > > > >     val totalBrokers = brokers.size
> > > > >     val oldBrokers = totalBrokers - brokersToAssignPartitions.size
> > > > >     if (oldBrokers == 0) {
> > > > >       throw new IllegalArgumentException("Cannot add partitions to
> > new
> > > > > brokers without existing partitions")
> > > > >     }
> > > > >     val expectedPartitions = currentPartitions * totalBrokers /
> > > > oldBrokers
> > > > >     val newPartitions = expectedPartitions - currentPartitions
> > > > >     if (newPartitions <= 0) {
> > > > >       throw new IllegalArgumentException("Invalid number of new
> > > > partitions
> > > > > %d".format(newPartitions))
> > > > >     }
> > > > >     val newPartitionReplicaList =
> > > > > AdminUtils.assignReplicasToBrokers(brokersToAssignPartitions,
> > > > > newPartitions, replicationFactor, startPartitionId =
> > currentPartitions)
> > > > >     val partitionReplicaList = existingPartitionsReplicaList.map(p
> =>
> > > > > p._1.partition -> p._2)
> > > > >     // add the new list
> > > > >     partitionReplicaList ++= newPartitionReplicaList
> > > > >     printf("Changing number of partitions from %d to %d to topic
> > > %s\n\n",
> > > > > currentPartitions, expectedPartitions, topic)
> > > > >     printf("Replica reassignment for new partitions:\n\n%s\n\n",
> > > > > getAssignmentJson(topic, newPartitionReplicaList))
> > > > >     printf("Complete replica assignment:\n\n%s\n\n",
> > > > > getAssignmentJson(topic, partitionReplicaList))
> > > > >     if (execute) {
> > > > >
> > >  AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient,
> > > > > topic, partitionReplicaList, config, update = true)
> > > > >       println("New partitions are added")
> > > > >     } else {
> > > > >       println("No update is executed in dry run mode")
> > > > >     }
> > > > >   }
> > > > >
> > > > > It seems to me that the new assignment in ZooKeeper data does not
> > > > > propagate to some of the new brokers. However, looking at
> > TopicCommand,
> > > > it
> > > > > uses the same AdminUtils function to add new partitions.
> > > > >
> > > > > Am I missing anything or this is a bug in the broker?
> > > > >
> > > > > Thanks,
> > > > > Allen
> > > > >
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -Regards,
> > > Mayuresh R. Gharat
> > > (862) 250-7125
> > >
> >
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>

Re: Broker is not aware of new partitions assigned

Posted by Mayuresh Gharat <gh...@gmail.com>.
Probably you can try restarting the controller and have same version for
the controller and the brokers.
BTW, was there any specific reason you are running 2 different versions for
the controller and other brokers?

Thanks,

Mayuresh

On Tue, Mar 17, 2015 at 4:02 PM, Allen Wang <aw...@netflix.com.invalid>
wrote:

> Yes, the watcher is still alive. The log in the controller indicates that
> it observed the changes.
>
>
> On Tue, Mar 17, 2015 at 2:05 PM, Mayuresh Gharat <
> gharatmayuresh15@gmail.com
> > wrote:
>
> > I think the way reassignment works is asynchronous. Changes are made to
> > zookeeper but those changes get reflected only when controller watcher
> > fires for the respective zookeeper path. Is your watcher still alive?
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Tue, Mar 17, 2015 at 1:29 PM, Allen Wang <aw...@netflix.com.invalid>
> > wrote:
> >
> > > Looking a bit more into controller log, it seems that when the
> partition
> > > assignment is changed in ZooKeeper, the controller has quite a lot
> > > exceptions communicating with new brokers where the partitions are
> > > assigned. One thing to note is that the new brokers have Kafka version
> > > 0.8.2.1 and the controller has Kafka version 0.8.1.1.
> > >
> > > 2015-03-16 22:36:58,178 WARN  kafka.utils.Logging$class:89
> > > [Controller-2-to-broker-48-send-thread] [warn]
> > > [Controller-2-to-broker-48-send-thread], Controller 2 fails to send a
> > > request to broker id:48,host:xyz:7101
> > > java.io.EOFException: Received -1 when reading from channel, socket has
> > > likely been closed.
> > >         at kafka.utils.Utils$.read(Utils.scala:376)
> > >         at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > >         at
> > > kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > >         at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > >         at
> > kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > >         at
> > >
> > >
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
> > >         at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > >
> > > Does it explain why the brokers are not aware of the new assignments?
> Is
> > > there anyway to recover from this communication problem, like
> restarting
> > > the controller?
> > >
> > > Thanks,
> > > Allen
> > >
> > >
> > > On Tue, Mar 17, 2015 at 10:34 AM, Allen Wang <aw...@netflix.com>
> wrote:
> > >
> > > > Hello,
> > > >
> > > > I developed a tool to add partitions and assign new partitions to a
> set
> > > of
> > > > brokers in one operation by utilizing the API
> > > > AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK().
> > > >
> > > > It worked well in most cases. However, in one case, I found that the
> > > > brokers are not aware of new partitions assigned to them, even though
> > the
> > > > zookeeper data clearly shows the assignment.
> > > >
> > > > Here is the zookeeper data for the partition:
> > > >
> > > >
> > > >
> > >
> >
> {"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]}
> > > >
> > > > On broker 62, the error message is:
> > > >
> > > > 2015-03-17 17:11:57,157 WARN  kafka.utils.Logging$class:83
> > > > [kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with
> > > > correlation id 2048464 from client x on partition [m,71] failed due
> to
> > > > Partition [m,71] doesn't exist on 62
> > > >
> > > > Here is the core function of the tool:
> > > >
> > > >   def addPartitionsToTopic(zkClient: ZkClient, topic: String,
> > > > brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute:
> > > Boolean):
> > > > Unit = {
> > > >     val existingPartitionsReplicaList =
> > > > ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
> > > >     val config = AdminUtils.fetchTopicConfig(zkClient, topic)
> > > >     printf("Topic config: %s\n\n", config)
> > > >     if (existingPartitionsReplicaList.size == 0)
> > > >       throw new AdminOperationException("The topic %s does not
> > > > exist".format(topic))
> > > >     val currentPartitions = existingPartitionsReplicaList.size
> > > >     val replicationFactor = existingPartitionsReplicaList.map(e =>
> > > > e._2.size).max
> > > >     val brokersWithPartitions =
> existingPartitionsReplicaList.flatMap(e
> > > =>
> > > > e._2).toSet.toSeq
> > > >     if
> > (brokersToAssignPartitions.intersect(brokersWithPartitions).size >
> > > > 0) {
> > > >       printf("Topic %s already has partitions on brokers %s.
> > > Skipping.\n",
> > > > topic, brokersToAssignPartitions)
> > > >       return
> > > >     }
> > > >     val totalBrokers = brokers.size
> > > >     val oldBrokers = totalBrokers - brokersToAssignPartitions.size
> > > >     if (oldBrokers == 0) {
> > > >       throw new IllegalArgumentException("Cannot add partitions to
> new
> > > > brokers without existing partitions")
> > > >     }
> > > >     val expectedPartitions = currentPartitions * totalBrokers /
> > > oldBrokers
> > > >     val newPartitions = expectedPartitions - currentPartitions
> > > >     if (newPartitions <= 0) {
> > > >       throw new IllegalArgumentException("Invalid number of new
> > > partitions
> > > > %d".format(newPartitions))
> > > >     }
> > > >     val newPartitionReplicaList =
> > > > AdminUtils.assignReplicasToBrokers(brokersToAssignPartitions,
> > > > newPartitions, replicationFactor, startPartitionId =
> currentPartitions)
> > > >     val partitionReplicaList = existingPartitionsReplicaList.map(p =>
> > > > p._1.partition -> p._2)
> > > >     // add the new list
> > > >     partitionReplicaList ++= newPartitionReplicaList
> > > >     printf("Changing number of partitions from %d to %d to topic
> > %s\n\n",
> > > > currentPartitions, expectedPartitions, topic)
> > > >     printf("Replica reassignment for new partitions:\n\n%s\n\n",
> > > > getAssignmentJson(topic, newPartitionReplicaList))
> > > >     printf("Complete replica assignment:\n\n%s\n\n",
> > > > getAssignmentJson(topic, partitionReplicaList))
> > > >     if (execute) {
> > > >
> >  AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient,
> > > > topic, partitionReplicaList, config, update = true)
> > > >       println("New partitions are added")
> > > >     } else {
> > > >       println("No update is executed in dry run mode")
> > > >     }
> > > >   }
> > > >
> > > > It seems to me that the new assignment in ZooKeeper data does not
> > > > propagate to some of the new brokers. However, looking at
> TopicCommand,
> > > it
> > > > uses the same AdminUtils function to add new partitions.
> > > >
> > > > Am I missing anything or this is a bug in the broker?
> > > >
> > > > Thanks,
> > > > Allen
> > > >
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Re: Broker is not aware of new partitions assigned

Posted by Allen Wang <aw...@netflix.com.INVALID>.
Yes, the watcher is still alive. The log in the controller indicates that
it observed the changes.


On Tue, Mar 17, 2015 at 2:05 PM, Mayuresh Gharat <gharatmayuresh15@gmail.com
> wrote:

> I think the way reassignment works is asynchronous. Changes are made to
> zookeeper but those changes get reflected only when controller watcher
> fires for the respective zookeeper path. Is your watcher still alive?
>
> Thanks,
>
> Mayuresh
>
> On Tue, Mar 17, 2015 at 1:29 PM, Allen Wang <aw...@netflix.com.invalid>
> wrote:
>
> > Looking a bit more into controller log, it seems that when the partition
> > assignment is changed in ZooKeeper, the controller has quite a lot
> > exceptions communicating with new brokers where the partitions are
> > assigned. One thing to note is that the new brokers have Kafka version
> > 0.8.2.1 and the controller has Kafka version 0.8.1.1.
> >
> > 2015-03-16 22:36:58,178 WARN  kafka.utils.Logging$class:89
> > [Controller-2-to-broker-48-send-thread] [warn]
> > [Controller-2-to-broker-48-send-thread], Controller 2 fails to send a
> > request to broker id:48,host:xyz:7101
> > java.io.EOFException: Received -1 when reading from channel, socket has
> > likely been closed.
> >         at kafka.utils.Utils$.read(Utils.scala:376)
> >         at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >         at
> > kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >         at
> >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >         at
> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >         at
> >
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
> >         at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >
> > Does it explain why the brokers are not aware of the new assignments? Is
> > there anyway to recover from this communication problem, like restarting
> > the controller?
> >
> > Thanks,
> > Allen
> >
> >
> > On Tue, Mar 17, 2015 at 10:34 AM, Allen Wang <aw...@netflix.com> wrote:
> >
> > > Hello,
> > >
> > > I developed a tool to add partitions and assign new partitions to a set
> > of
> > > brokers in one operation by utilizing the API
> > > AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK().
> > >
> > > It worked well in most cases. However, in one case, I found that the
> > > brokers are not aware of new partitions assigned to them, even though
> the
> > > zookeeper data clearly shows the assignment.
> > >
> > > Here is the zookeeper data for the partition:
> > >
> > >
> > >
> >
> {"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]}
> > >
> > > On broker 62, the error message is:
> > >
> > > 2015-03-17 17:11:57,157 WARN  kafka.utils.Logging$class:83
> > > [kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with
> > > correlation id 2048464 from client x on partition [m,71] failed due to
> > > Partition [m,71] doesn't exist on 62
> > >
> > > Here is the core function of the tool:
> > >
> > >   def addPartitionsToTopic(zkClient: ZkClient, topic: String,
> > > brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute:
> > Boolean):
> > > Unit = {
> > >     val existingPartitionsReplicaList =
> > > ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
> > >     val config = AdminUtils.fetchTopicConfig(zkClient, topic)
> > >     printf("Topic config: %s\n\n", config)
> > >     if (existingPartitionsReplicaList.size == 0)
> > >       throw new AdminOperationException("The topic %s does not
> > > exist".format(topic))
> > >     val currentPartitions = existingPartitionsReplicaList.size
> > >     val replicationFactor = existingPartitionsReplicaList.map(e =>
> > > e._2.size).max
> > >     val brokersWithPartitions = existingPartitionsReplicaList.flatMap(e
> > =>
> > > e._2).toSet.toSeq
> > >     if
> (brokersToAssignPartitions.intersect(brokersWithPartitions).size >
> > > 0) {
> > >       printf("Topic %s already has partitions on brokers %s.
> > Skipping.\n",
> > > topic, brokersToAssignPartitions)
> > >       return
> > >     }
> > >     val totalBrokers = brokers.size
> > >     val oldBrokers = totalBrokers - brokersToAssignPartitions.size
> > >     if (oldBrokers == 0) {
> > >       throw new IllegalArgumentException("Cannot add partitions to new
> > > brokers without existing partitions")
> > >     }
> > >     val expectedPartitions = currentPartitions * totalBrokers /
> > oldBrokers
> > >     val newPartitions = expectedPartitions - currentPartitions
> > >     if (newPartitions <= 0) {
> > >       throw new IllegalArgumentException("Invalid number of new
> > partitions
> > > %d".format(newPartitions))
> > >     }
> > >     val newPartitionReplicaList =
> > > AdminUtils.assignReplicasToBrokers(brokersToAssignPartitions,
> > > newPartitions, replicationFactor, startPartitionId = currentPartitions)
> > >     val partitionReplicaList = existingPartitionsReplicaList.map(p =>
> > > p._1.partition -> p._2)
> > >     // add the new list
> > >     partitionReplicaList ++= newPartitionReplicaList
> > >     printf("Changing number of partitions from %d to %d to topic
> %s\n\n",
> > > currentPartitions, expectedPartitions, topic)
> > >     printf("Replica reassignment for new partitions:\n\n%s\n\n",
> > > getAssignmentJson(topic, newPartitionReplicaList))
> > >     printf("Complete replica assignment:\n\n%s\n\n",
> > > getAssignmentJson(topic, partitionReplicaList))
> > >     if (execute) {
> > >
>  AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient,
> > > topic, partitionReplicaList, config, update = true)
> > >       println("New partitions are added")
> > >     } else {
> > >       println("No update is executed in dry run mode")
> > >     }
> > >   }
> > >
> > > It seems to me that the new assignment in ZooKeeper data does not
> > > propagate to some of the new brokers. However, looking at TopicCommand,
> > it
> > > uses the same AdminUtils function to add new partitions.
> > >
> > > Am I missing anything or this is a bug in the broker?
> > >
> > > Thanks,
> > > Allen
> > >
> > >
> > >
> >
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>

Re: Broker is not aware of new partitions assigned

Posted by Mayuresh Gharat <gh...@gmail.com>.
I think the way reassignment works is asynchronous. Changes are made to
zookeeper but those changes get reflected only when controller watcher
fires for the respective zookeeper path. Is your watcher still alive?

Thanks,

Mayuresh

On Tue, Mar 17, 2015 at 1:29 PM, Allen Wang <aw...@netflix.com.invalid>
wrote:

> Looking a bit more into controller log, it seems that when the partition
> assignment is changed in ZooKeeper, the controller has quite a lot
> exceptions communicating with new brokers where the partitions are
> assigned. One thing to note is that the new brokers have Kafka version
> 0.8.2.1 and the controller has Kafka version 0.8.1.1.
>
> 2015-03-16 22:36:58,178 WARN  kafka.utils.Logging$class:89
> [Controller-2-to-broker-48-send-thread] [warn]
> [Controller-2-to-broker-48-send-thread], Controller 2 fails to send a
> request to broker id:48,host:xyz:7101
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
>         at kafka.utils.Utils$.read(Utils.scala:376)
>         at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>         at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>         at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>         at
>
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
> Does it explain why the brokers are not aware of the new assignments? Is
> there anyway to recover from this communication problem, like restarting
> the controller?
>
> Thanks,
> Allen
>
>
> On Tue, Mar 17, 2015 at 10:34 AM, Allen Wang <aw...@netflix.com> wrote:
>
> > Hello,
> >
> > I developed a tool to add partitions and assign new partitions to a set
> of
> > brokers in one operation by utilizing the API
> > AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK().
> >
> > It worked well in most cases. However, in one case, I found that the
> > brokers are not aware of new partitions assigned to them, even though the
> > zookeeper data clearly shows the assignment.
> >
> > Here is the zookeeper data for the partition:
> >
> >
> >
> {"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]}
> >
> > On broker 62, the error message is:
> >
> > 2015-03-17 17:11:57,157 WARN  kafka.utils.Logging$class:83
> > [kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with
> > correlation id 2048464 from client x on partition [m,71] failed due to
> > Partition [m,71] doesn't exist on 62
> >
> > Here is the core function of the tool:
> >
> >   def addPartitionsToTopic(zkClient: ZkClient, topic: String,
> > brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute:
> Boolean):
> > Unit = {
> >     val existingPartitionsReplicaList =
> > ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
> >     val config = AdminUtils.fetchTopicConfig(zkClient, topic)
> >     printf("Topic config: %s\n\n", config)
> >     if (existingPartitionsReplicaList.size == 0)
> >       throw new AdminOperationException("The topic %s does not
> > exist".format(topic))
> >     val currentPartitions = existingPartitionsReplicaList.size
> >     val replicationFactor = existingPartitionsReplicaList.map(e =>
> > e._2.size).max
> >     val brokersWithPartitions = existingPartitionsReplicaList.flatMap(e
> =>
> > e._2).toSet.toSeq
> >     if (brokersToAssignPartitions.intersect(brokersWithPartitions).size >
> > 0) {
> >       printf("Topic %s already has partitions on brokers %s.
> Skipping.\n",
> > topic, brokersToAssignPartitions)
> >       return
> >     }
> >     val totalBrokers = brokers.size
> >     val oldBrokers = totalBrokers - brokersToAssignPartitions.size
> >     if (oldBrokers == 0) {
> >       throw new IllegalArgumentException("Cannot add partitions to new
> > brokers without existing partitions")
> >     }
> >     val expectedPartitions = currentPartitions * totalBrokers /
> oldBrokers
> >     val newPartitions = expectedPartitions - currentPartitions
> >     if (newPartitions <= 0) {
> >       throw new IllegalArgumentException("Invalid number of new
> partitions
> > %d".format(newPartitions))
> >     }
> >     val newPartitionReplicaList =
> > AdminUtils.assignReplicasToBrokers(brokersToAssignPartitions,
> > newPartitions, replicationFactor, startPartitionId = currentPartitions)
> >     val partitionReplicaList = existingPartitionsReplicaList.map(p =>
> > p._1.partition -> p._2)
> >     // add the new list
> >     partitionReplicaList ++= newPartitionReplicaList
> >     printf("Changing number of partitions from %d to %d to topic %s\n\n",
> > currentPartitions, expectedPartitions, topic)
> >     printf("Replica reassignment for new partitions:\n\n%s\n\n",
> > getAssignmentJson(topic, newPartitionReplicaList))
> >     printf("Complete replica assignment:\n\n%s\n\n",
> > getAssignmentJson(topic, partitionReplicaList))
> >     if (execute) {
> >       AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient,
> > topic, partitionReplicaList, config, update = true)
> >       println("New partitions are added")
> >     } else {
> >       println("No update is executed in dry run mode")
> >     }
> >   }
> >
> > It seems to me that the new assignment in ZooKeeper data does not
> > propagate to some of the new brokers. However, looking at TopicCommand,
> it
> > uses the same AdminUtils function to add new partitions.
> >
> > Am I missing anything or this is a bug in the broker?
> >
> > Thanks,
> > Allen
> >
> >
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Re: Broker is not aware of new partitions assigned

Posted by Allen Wang <aw...@netflix.com.INVALID>.
Looking a bit more into controller log, it seems that when the partition
assignment is changed in ZooKeeper, the controller has quite a lot
exceptions communicating with new brokers where the partitions are
assigned. One thing to note is that the new brokers have Kafka version
0.8.2.1 and the controller has Kafka version 0.8.1.1.

2015-03-16 22:36:58,178 WARN  kafka.utils.Logging$class:89
[Controller-2-to-broker-48-send-thread] [warn]
[Controller-2-to-broker-48-send-thread], Controller 2 fails to send a
request to broker id:48,host:xyz:7101
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
        at kafka.utils.Utils$.read(Utils.scala:376)
        at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
        at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
        at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
        at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

Does it explain why the brokers are not aware of the new assignments? Is
there anyway to recover from this communication problem, like restarting
the controller?

Thanks,
Allen


On Tue, Mar 17, 2015 at 10:34 AM, Allen Wang <aw...@netflix.com> wrote:

> Hello,
>
> I developed a tool to add partitions and assign new partitions to a set of
> brokers in one operation by utilizing the API
> AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK().
>
> It worked well in most cases. However, in one case, I found that the
> brokers are not aware of new partitions assigned to them, even though the
> zookeeper data clearly shows the assignment.
>
> Here is the zookeeper data for the partition:
>
>
> {"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]}
>
> On broker 62, the error message is:
>
> 2015-03-17 17:11:57,157 WARN  kafka.utils.Logging$class:83
> [kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with
> correlation id 2048464 from client x on partition [m,71] failed due to
> Partition [m,71] doesn't exist on 62
>
> Here is the core function of the tool:
>
>   def addPartitionsToTopic(zkClient: ZkClient, topic: String,
> brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute: Boolean):
> Unit = {
>     val existingPartitionsReplicaList =
> ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
>     val config = AdminUtils.fetchTopicConfig(zkClient, topic)
>     printf("Topic config: %s\n\n", config)
>     if (existingPartitionsReplicaList.size == 0)
>       throw new AdminOperationException("The topic %s does not
> exist".format(topic))
>     val currentPartitions = existingPartitionsReplicaList.size
>     val replicationFactor = existingPartitionsReplicaList.map(e =>
> e._2.size).max
>     val brokersWithPartitions = existingPartitionsReplicaList.flatMap(e =>
> e._2).toSet.toSeq
>     if (brokersToAssignPartitions.intersect(brokersWithPartitions).size >
> 0) {
>       printf("Topic %s already has partitions on brokers %s. Skipping.\n",
> topic, brokersToAssignPartitions)
>       return
>     }
>     val totalBrokers = brokers.size
>     val oldBrokers = totalBrokers - brokersToAssignPartitions.size
>     if (oldBrokers == 0) {
>       throw new IllegalArgumentException("Cannot add partitions to new
> brokers without existing partitions")
>     }
>     val expectedPartitions = currentPartitions * totalBrokers / oldBrokers
>     val newPartitions = expectedPartitions - currentPartitions
>     if (newPartitions <= 0) {
>       throw new IllegalArgumentException("Invalid number of new partitions
> %d".format(newPartitions))
>     }
>     val newPartitionReplicaList =
> AdminUtils.assignReplicasToBrokers(brokersToAssignPartitions,
> newPartitions, replicationFactor, startPartitionId = currentPartitions)
>     val partitionReplicaList = existingPartitionsReplicaList.map(p =>
> p._1.partition -> p._2)
>     // add the new list
>     partitionReplicaList ++= newPartitionReplicaList
>     printf("Changing number of partitions from %d to %d to topic %s\n\n",
> currentPartitions, expectedPartitions, topic)
>     printf("Replica reassignment for new partitions:\n\n%s\n\n",
> getAssignmentJson(topic, newPartitionReplicaList))
>     printf("Complete replica assignment:\n\n%s\n\n",
> getAssignmentJson(topic, partitionReplicaList))
>     if (execute) {
>       AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient,
> topic, partitionReplicaList, config, update = true)
>       println("New partitions are added")
>     } else {
>       println("No update is executed in dry run mode")
>     }
>   }
>
> It seems to me that the new assignment in ZooKeeper data does not
> propagate to some of the new brokers. However, looking at TopicCommand, it
> uses the same AdminUtils function to add new partitions.
>
> Am I missing anything or this is a bug in the broker?
>
> Thanks,
> Allen
>
>
>