You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ryan Berdeen <rb...@hubspot.com> on 2014/03/14 00:05:01 UTC

Invalid topic metadata response

After reassigning a topic to 5 new brokers, the topic metadata returned by
the brokers for the topic is causing an exception when a producer tries to
read it:

java.util.NoSuchElementException: key not found: 7
...
at kafka.api.PartitionMetadata$.readFrom(TopicMetadata.scala:104)

The reassignment was from brokers 1, 4, 5, 7, 8 to 10, 11, 12, 13, 14. In
the topic metadata response, only the 5 new brokers (>= 10) are included,
but there is a partition whose ISR includes broker 7. It looks like this:

{:error-code 0, :partition-id 13, :leader-id 10, :num-replicas 4,
:replica-ids [10 12 13 14], :num-isr 5, :isr-ids [10 14 13 12 7]}

For all other partitions, the ISR and the replicas are the same brokers,
but for this partition, the ISR is larger. I stopped and started the
controller, and the and topic metadata was corrected.

Has anyone seen this before? Is there anything in particular to look for in
the logs?

Thanks,

Ryan

Here's the full exception:

java.util.NoSuchElementException: key not found: 7
  at scala.collection.MapLike$class.default(MapLike.scala:225)
  at scala.collection.immutable.HashMap.default(HashMap.scala:38)
  at scala.collection.MapLike$class.apply(MapLike.scala:135)
  at scala.collection.immutable.HashMap.apply(HashMap.scala:38)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
  at scala.collection.immutable.VectorIterator.foreach(Vector.scala:648)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
  at scala.collection.immutable.Vector.foreach(Vector.scala:63)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
  at scala.collection.immutable.Vector.map(Vector.scala:63)
  at kafka.api.PartitionMetadata$.readFrom(TopicMetadata.scala:104)
  at kafka.api.TopicMetadata$$anonfun$readFrom$1.apply(TopicMetadata.scala:37)
  at kafka.api.TopicMetadata$$anonfun$readFrom$1.apply(TopicMetadata.scala:36)
  at scala.collection.immutable.Range.foreach(Range.scala:78)
  at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36)
  at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
  at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
  at scala.collection.immutable.Range.foreach(Range.scala:81)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
  at scala.collection.immutable.Range.map(Range.scala:46)
  at kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31)
  at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
  at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
  at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
  at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
  at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
  at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
  at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
  at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
  at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
  at kafka.producer.Producer.send(Producer.scala:76)
  at kafka.javaapi.producer.Producer.send(Producer.scala:42)

Re: Invalid topic metadata response

Posted by Neha Narkhede <ne...@gmail.com>.
Most of the bugs we hit were recoverable in the sense that doing a rolling
bounce on the cluster fixed the state of the topic. I updated the
documentation to add the warning.

Thanks,
Neha


On Fri, Mar 14, 2014 at 9:17 AM, Ryan Berdeen <rb...@hubspot.com> wrote:

> By the "reassignment tool", do you mean the tool as described at
>
> https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool
> ,
> which just writes the JSON to ZooKeeper, or the code in KafkaController
> that is actually responsible for reassignment?
>
> Would it be appropriate to add a warning to the documentation, to the
> effect of "WARNING: in 0.8, this tool may corrupt the state of partition
> assignment, rendering your topic unusable"?
>
> Thanks,
>
> Ryan
>
>
> On Thu, Mar 13, 2014 at 7:09 PM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > There are some known bugs with the partition reassignment tool in 0.8. We
> > fixed a lot of those, tested it on large set of partitions and found it
> to
> > be stable in 0.8.1. Could you give 0.8.1 a spin instead?
> >
> >
> > On Thu, Mar 13, 2014 at 4:05 PM, Ryan Berdeen <rb...@hubspot.com>
> > wrote:
> >
> > > After reassigning a topic to 5 new brokers, the topic metadata returned
> > by
> > > the brokers for the topic is causing an exception when a producer tries
> > to
> > > read it:
> > >
> > > java.util.NoSuchElementException: key not found: 7
> > > ...
> > > at kafka.api.PartitionMetadata$.readFrom(TopicMetadata.scala:104)
> > >
> > > The reassignment was from brokers 1, 4, 5, 7, 8 to 10, 11, 12, 13, 14.
> In
> > > the topic metadata response, only the 5 new brokers (>= 10) are
> included,
> > > but there is a partition whose ISR includes broker 7. It looks like
> this:
> > >
> > > {:error-code 0, :partition-id 13, :leader-id 10, :num-replicas 4,
> > > :replica-ids [10 12 13 14], :num-isr 5, :isr-ids [10 14 13 12 7]}
> > >
> > > For all other partitions, the ISR and the replicas are the same
> brokers,
> > > but for this partition, the ISR is larger. I stopped and started the
> > > controller, and the and topic metadata was corrected.
> > >
> > > Has anyone seen this before? Is there anything in particular to look
> for
> > in
> > > the logs?
> > >
> > > Thanks,
> > >
> > > Ryan
> > >
> > > Here's the full exception:
> > >
> > > java.util.NoSuchElementException: key not found: 7
> > >   at scala.collection.MapLike$class.default(MapLike.scala:225)
> > >   at scala.collection.immutable.HashMap.default(HashMap.scala:38)
> > >   at scala.collection.MapLike$class.apply(MapLike.scala:135)
> > >   at scala.collection.immutable.HashMap.apply(HashMap.scala:38)
> > >   at
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > >   at
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > >   at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> > >   at
> scala.collection.immutable.VectorIterator.foreach(Vector.scala:648)
> > >   at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
> > >   at scala.collection.immutable.Vector.foreach(Vector.scala:63)
> > >   at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> > >   at scala.collection.immutable.Vector.map(Vector.scala:63)
> > >   at kafka.api.PartitionMetadata$.readFrom(TopicMetadata.scala:104)
> > >   at
> > >
> kafka.api.TopicMetadata$$anonfun$readFrom$1.apply(TopicMetadata.scala:37)
> > >   at
> > >
> kafka.api.TopicMetadata$$anonfun$readFrom$1.apply(TopicMetadata.scala:36)
> > >   at scala.collection.immutable.Range.foreach(Range.scala:78)
> > >   at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36)
> > >   at
> > >
> >
> kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
> > >   at
> > >
> >
> kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
> > >   at
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > >   at
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > >   at scala.collection.immutable.Range.foreach(Range.scala:81)
> > >   at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> > >   at scala.collection.immutable.Range.map(Range.scala:46)
> > >   at
> > >
> kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31)
> > >   at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > >   at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > >   at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > >   at
> > >
> >
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > >   at
> > >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
> > >   at
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > >   at
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
> > >   at
> > >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> > >   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > >   at
> > >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
> > >   at
> > >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
> > >   at
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> > >   at kafka.producer.Producer.send(Producer.scala:76)
> > >   at kafka.javaapi.producer.Producer.send(Producer.scala:42)
> > >
> >
>

Re: Invalid topic metadata response

Posted by Ryan Berdeen <rb...@hubspot.com>.
By the "reassignment tool", do you mean the tool as described at
https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool,
which just writes the JSON to ZooKeeper, or the code in KafkaController
that is actually responsible for reassignment?

Would it be appropriate to add a warning to the documentation, to the
effect of "WARNING: in 0.8, this tool may corrupt the state of partition
assignment, rendering your topic unusable"?

Thanks,

Ryan


On Thu, Mar 13, 2014 at 7:09 PM, Neha Narkhede <ne...@gmail.com>wrote:

> There are some known bugs with the partition reassignment tool in 0.8. We
> fixed a lot of those, tested it on large set of partitions and found it to
> be stable in 0.8.1. Could you give 0.8.1 a spin instead?
>
>
> On Thu, Mar 13, 2014 at 4:05 PM, Ryan Berdeen <rb...@hubspot.com>
> wrote:
>
> > After reassigning a topic to 5 new brokers, the topic metadata returned
> by
> > the brokers for the topic is causing an exception when a producer tries
> to
> > read it:
> >
> > java.util.NoSuchElementException: key not found: 7
> > ...
> > at kafka.api.PartitionMetadata$.readFrom(TopicMetadata.scala:104)
> >
> > The reassignment was from brokers 1, 4, 5, 7, 8 to 10, 11, 12, 13, 14. In
> > the topic metadata response, only the 5 new brokers (>= 10) are included,
> > but there is a partition whose ISR includes broker 7. It looks like this:
> >
> > {:error-code 0, :partition-id 13, :leader-id 10, :num-replicas 4,
> > :replica-ids [10 12 13 14], :num-isr 5, :isr-ids [10 14 13 12 7]}
> >
> > For all other partitions, the ISR and the replicas are the same brokers,
> > but for this partition, the ISR is larger. I stopped and started the
> > controller, and the and topic metadata was corrected.
> >
> > Has anyone seen this before? Is there anything in particular to look for
> in
> > the logs?
> >
> > Thanks,
> >
> > Ryan
> >
> > Here's the full exception:
> >
> > java.util.NoSuchElementException: key not found: 7
> >   at scala.collection.MapLike$class.default(MapLike.scala:225)
> >   at scala.collection.immutable.HashMap.default(HashMap.scala:38)
> >   at scala.collection.MapLike$class.apply(MapLike.scala:135)
> >   at scala.collection.immutable.HashMap.apply(HashMap.scala:38)
> >   at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> >   at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> >   at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >   at scala.collection.immutable.VectorIterator.foreach(Vector.scala:648)
> >   at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
> >   at scala.collection.immutable.Vector.foreach(Vector.scala:63)
> >   at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> >   at scala.collection.immutable.Vector.map(Vector.scala:63)
> >   at kafka.api.PartitionMetadata$.readFrom(TopicMetadata.scala:104)
> >   at
> > kafka.api.TopicMetadata$$anonfun$readFrom$1.apply(TopicMetadata.scala:37)
> >   at
> > kafka.api.TopicMetadata$$anonfun$readFrom$1.apply(TopicMetadata.scala:36)
> >   at scala.collection.immutable.Range.foreach(Range.scala:78)
> >   at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36)
> >   at
> >
> kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
> >   at
> >
> kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
> >   at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> >   at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> >   at scala.collection.immutable.Range.foreach(Range.scala:81)
> >   at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> >   at scala.collection.immutable.Range.map(Range.scala:46)
> >   at
> > kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31)
> >   at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> >   at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> >   at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> >   at
> >
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> >   at
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
> >   at
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> >   at
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
> >   at
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> >   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >   at
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
> >   at
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
> >   at
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> >   at kafka.producer.Producer.send(Producer.scala:76)
> >   at kafka.javaapi.producer.Producer.send(Producer.scala:42)
> >
>

Re: Invalid topic metadata response

Posted by Neha Narkhede <ne...@gmail.com>.
There are some known bugs with the partition reassignment tool in 0.8. We
fixed a lot of those, tested it on large set of partitions and found it to
be stable in 0.8.1. Could you give 0.8.1 a spin instead?


On Thu, Mar 13, 2014 at 4:05 PM, Ryan Berdeen <rb...@hubspot.com> wrote:

> After reassigning a topic to 5 new brokers, the topic metadata returned by
> the brokers for the topic is causing an exception when a producer tries to
> read it:
>
> java.util.NoSuchElementException: key not found: 7
> ...
> at kafka.api.PartitionMetadata$.readFrom(TopicMetadata.scala:104)
>
> The reassignment was from brokers 1, 4, 5, 7, 8 to 10, 11, 12, 13, 14. In
> the topic metadata response, only the 5 new brokers (>= 10) are included,
> but there is a partition whose ISR includes broker 7. It looks like this:
>
> {:error-code 0, :partition-id 13, :leader-id 10, :num-replicas 4,
> :replica-ids [10 12 13 14], :num-isr 5, :isr-ids [10 14 13 12 7]}
>
> For all other partitions, the ISR and the replicas are the same brokers,
> but for this partition, the ISR is larger. I stopped and started the
> controller, and the and topic metadata was corrected.
>
> Has anyone seen this before? Is there anything in particular to look for in
> the logs?
>
> Thanks,
>
> Ryan
>
> Here's the full exception:
>
> java.util.NoSuchElementException: key not found: 7
>   at scala.collection.MapLike$class.default(MapLike.scala:225)
>   at scala.collection.immutable.HashMap.default(HashMap.scala:38)
>   at scala.collection.MapLike$class.apply(MapLike.scala:135)
>   at scala.collection.immutable.HashMap.apply(HashMap.scala:38)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>   at scala.collection.immutable.VectorIterator.foreach(Vector.scala:648)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>   at scala.collection.immutable.Vector.foreach(Vector.scala:63)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
>   at scala.collection.immutable.Vector.map(Vector.scala:63)
>   at kafka.api.PartitionMetadata$.readFrom(TopicMetadata.scala:104)
>   at
> kafka.api.TopicMetadata$$anonfun$readFrom$1.apply(TopicMetadata.scala:37)
>   at
> kafka.api.TopicMetadata$$anonfun$readFrom$1.apply(TopicMetadata.scala:36)
>   at scala.collection.immutable.Range.foreach(Range.scala:78)
>   at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36)
>   at
> kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
>   at
> kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
>   at scala.collection.immutable.Range.foreach(Range.scala:81)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
>   at scala.collection.immutable.Range.map(Range.scala:46)
>   at
> kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31)
>   at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>   at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>   at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>   at
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
>   at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
>   at
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
>   at
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
>   at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
>   at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
>   at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>   at kafka.producer.Producer.send(Producer.scala:76)
>   at kafka.javaapi.producer.Producer.send(Producer.scala:42)
>