You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Valentin Golev <va...@gdeslon.ru> on 2016/12/22 16:42:56 UTC

Kafka won't replicate from a specific broker

Hello,

I have a three broker Kafka setup (the ids are 1, 2 (kafka 0.10.1.0) and
1001 (kafka 0.10.0.0)). After a failure of two of them a lot of the
partitions have the third one (1001) as their leader. It's like this:

     Topic: userevents0.open Partition: 5    Leader: 1       Replicas:
1,2,1001      Isr: 1,1001,2
        Topic: userevents0.open Partition: 6    Leader: 2       Replicas:
2,1,1001      Isr: 1,2,1001
        Topic: userevents0.open Partition: 7    Leader: 1001    Replicas:
1001,2,1      Isr: 1001
        Topic: userevents0.open Partition: 8    Leader: 1       Replicas:
1,1001,2      Isr: 1,1001,2
        Topic: userevents0.open Partition: 9    Leader: 1001    Replicas:
2,1001,1      Isr: 1001
        Topic: userevents0.open Partition: 10   Leader: 1001    Replicas:
1001,1,2      Isr: 1001

As you can see, only the partitions with Leaders 1 or 2 have successfully
replicated. Brokers 1 and 2, however, are unable to fetch data from the
1001.

All of the partitions are available to the consumers and producers. So
everything is fine except replication. 1001 is available from the other
servers.

I can't restart the broker 1001 because it seems that it will cause data
loss (as you can see, it's the only ISR on many partitions). Restarting the
other brokers didn't help at all. Neither did just plain waiting (it's the
third day of this going on). So what do I do?

The logs of the broker 2 (the one which tries to fetch data) are full of
this:

[2016-12-22 16:38:52,199] WARN [ReplicaFetcherThread-0-1001], Error in
fetch kafka.server.ReplicaFetcherThread$FetchRequest@117a49bf
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 1001 was disconnected before the
response was read
        at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115)
        at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112)
        at scala.Option.foreach(Option.scala:257)
        at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112)
        at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108)
        at
kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:137)
        at
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
        at
kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)
        at
kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253)
        at
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
        at
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
        at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
        at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

The logs of the broker 1001 are full of this:

[2016-12-22 16:38:54,226] ERROR Processor got uncaught exception.
(kafka.network.Processor)
java.nio.BufferUnderflowException
        at java.nio.Buffer.nextGetIndex(Buffer.java:506)
        at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:361)
        at
kafka.api.FetchRequest$$anonfun$1$$anonfun$apply$1.apply(FetchRequest.scala:55)
        at
kafka.api.FetchRequest$$anonfun$1$$anonfun$apply$1.apply(FetchRequest.scala:52)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.Range.foreach(Range.scala:160)
        at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at kafka.api.FetchRequest$$anonfun$1.apply(FetchRequest.scala:52)
        at kafka.api.FetchRequest$$anonfun$1.apply(FetchRequest.scala:49)
        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.immutable.Range.foreach(Range.scala:160)
        at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at kafka.api.FetchRequest$.readFrom(FetchRequest.scala:49)
        at
kafka.network.RequestChannel$Request$$anonfun$2.apply(RequestChannel.scala:65)
        at
kafka.network.RequestChannel$Request$$anonfun$2.apply(RequestChannel.scala:65)
        at
kafka.network.RequestChannel$Request$$anonfun$4.apply(RequestChannel.scala:71)
        at
kafka.network.RequestChannel$Request$$anonfun$4.apply(RequestChannel.scala:71)
        at scala.Option.map(Option.scala:146)
        at
kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:71)
        at
kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:488)
        at
kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:483)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at
kafka.network.Processor.processCompletedReceives(SocketServer.scala:483)
        at kafka.network.Processor.run(SocketServer.scala:413)
        at java.lang.Thread.run(Thread.java:745)

Re: Kafka won't replicate from a specific broker

Posted by Valentin Golev <va...@gdeslon.ru>.
Hi, Ismael and Jan,

Thanks a lot for your prompt responses!

> Is inter.broker.protocol.version set correctly in brokers 1 and 2? It
> should be 0.10.0 so that they can talk to the older broker without issue.

I set it on the broker #2, but it doesnt't seem to work.

> The only option I know of is to reboot the affected broker. And upgrade
to 0.10.1.1 as quickly as possible. We haven't seen this issue on
0.10.1.1.RC0.

 I'm using: https://hub.docker.com/r/wurstmeister/kafka/tags/ and there's
no such version. Neither does the website proposes to download it. Is 0.10.1.1
not considered stable yet? I'm not sure about using it... Maybe downgrade
would work?

Re: restarting the faulty broker. As I understand, to avoid losing data,
I'd have to shut down other parts of the cluster first, right?

-Valentin

On Thu, Dec 22, 2016 at 9:01 PM, Jan Omar <ja...@gmail.com> wrote:

> Unfortunately I think you hit this bug:
>
> https://issues.apache.org/jira/browse/KAFKA-4477 <
> https://issues.apache.org/jira/browse/KAFKA-4477>
>
> The only option I know of is to reboot the affected broker. And upgrade to
> 0.10.1.1 as quickly as possible. We haven't seen this issue on 0.10.1.1.RC0.
>
> Regards
>
> Jan
>
>
> > On 22 Dec 2016, at 18:16, Ismael Juma <is...@juma.me.uk> wrote:
> >
> > Hi Valentin,
> >
> > Is inter.broker.protocol.version set correctly in brokers 1 and 2? It
> > should be 0.10.0 so that they can talk to the older broker without issue.
> >
> > Ismael
> >
> > On Thu, Dec 22, 2016 at 4:42 PM, Valentin Golev <
> valentin.golev@gdeslon.ru>
> > wrote:
> >
> >> Hello,
> >>
> >> I have a three broker Kafka setup (the ids are 1, 2 (kafka 0.10.1.0) and
> >> 1001 (kafka 0.10.0.0)). After a failure of two of them a lot of the
> >> partitions have the third one (1001) as their leader. It's like this:
> >>
> >>     Topic: userevents0.open Partition: 5    Leader: 1       Replicas:
> >> 1,2,1001      Isr: 1,1001,2
> >>        Topic: userevents0.open Partition: 6    Leader: 2       Replicas:
> >> 2,1,1001      Isr: 1,2,1001
> >>        Topic: userevents0.open Partition: 7    Leader: 1001    Replicas:
> >> 1001,2,1      Isr: 1001
> >>        Topic: userevents0.open Partition: 8    Leader: 1       Replicas:
> >> 1,1001,2      Isr: 1,1001,2
> >>        Topic: userevents0.open Partition: 9    Leader: 1001    Replicas:
> >> 2,1001,1      Isr: 1001
> >>        Topic: userevents0.open Partition: 10   Leader: 1001    Replicas:
> >> 1001,1,2      Isr: 1001
> >>
> >> As you can see, only the partitions with Leaders 1 or 2 have
> successfully
> >> replicated. Brokers 1 and 2, however, are unable to fetch data from the
> >> 1001.
> >>
> >> All of the partitions are available to the consumers and producers. So
> >> everything is fine except replication. 1001 is available from the other
> >> servers.
> >>
> >> I can't restart the broker 1001 because it seems that it will cause data
> >> loss (as you can see, it's the only ISR on many partitions). Restarting
> the
> >> other brokers didn't help at all. Neither did just plain waiting (it's
> the
> >> third day of this going on). So what do I do?
> >>
> >> The logs of the broker 2 (the one which tries to fetch data) are full of
> >> this:
> >>
> >> [2016-12-22 16:38:52,199] WARN [ReplicaFetcherThread-0-1001], Error in
> >> fetch kafka.server.ReplicaFetcherThread$FetchRequest@117a49bf
> >> (kafka.server.ReplicaFetcherThread)
> >> java.io.IOException: Connection to 1001 was disconnected before the
> >> response was read
> >>        at
> >> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$
> >> extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115)
> >>        at
> >> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$
> >> extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112)
> >>        at scala.Option.foreach(Option.scala:257)
> >>        at
> >> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$
> >> extension$1.apply(NetworkClientBlockingOps.scala:112)
> >>        at
> >> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$
> >> extension$1.apply(NetworkClientBlockingOps.scala:108)
> >>        at
> >> kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> >> NetworkClientBlockingOps.scala:137)
> >>        at
> >> kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> >> NetworkClientBlockingOps$$pollContinuously$extension(
> >> NetworkClientBlockingOps.scala:143)
> >>        at
> >> kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(
> >> NetworkClientBlockingOps.scala:108)
> >>        at
> >> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcher
> Thread.scala:
> >> 253)
> >>        at
> >> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
> >>        at
> >> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
> >>        at
> >> kafka.server.AbstractFetcherThread.processFetchRequest(
> >> AbstractFetcherThread.scala:118)
> >>        at
> >> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThr
> ead.scala:103)
> >>        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:
> 63)
> >>
> >> The logs of the broker 1001 are full of this:
> >>
> >> [2016-12-22 16:38:54,226] ERROR Processor got uncaught exception.
> >> (kafka.network.Processor)
> >> java.nio.BufferUnderflowException
> >>        at java.nio.Buffer.nextGetIndex(Buffer.java:506)
> >>        at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:361)
> >>        at
> >> kafka.api.FetchRequest$$anonfun$1$$anonfun$apply$1.
> >> apply(FetchRequest.scala:55)
> >>        at
> >> kafka.api.FetchRequest$$anonfun$1$$anonfun$apply$1.
> >> apply(FetchRequest.scala:52)
> >>        at
> >> scala.collection.TraversableLike$$anonfun$map$
> >> 1.apply(TraversableLike.scala:234)
> >>        at
> >> scala.collection.TraversableLike$$anonfun$map$
> >> 1.apply(TraversableLike.scala:234)
> >>        at scala.collection.immutable.Range.foreach(Range.scala:160)
> >>        at
> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >>        at scala.collection.AbstractTraversable.map(Traversable.scala:
> 104)
> >>        at kafka.api.FetchRequest$$anonfun$1.apply(FetchRequest.scala:
> 52)
> >>        at kafka.api.FetchRequest$$anonfun$1.apply(FetchRequest.scala:
> 49)
> >>        at
> >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> >> TraversableLike.scala:241)
> >>        at
> >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> >> TraversableLike.scala:241)
> >>        at scala.collection.immutable.Range.foreach(Range.scala:160)
> >>        at
> >> scala.collection.TraversableLike$class.flatMap(TraversableLi
> ke.scala:241)
> >>        at
> >> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> >>        at kafka.api.FetchRequest$.readFrom(FetchRequest.scala:49)
> >>        at
> >> kafka.network.RequestChannel$Request$$anonfun$2.apply(
> >> RequestChannel.scala:65)
> >>        at
> >> kafka.network.RequestChannel$Request$$anonfun$2.apply(
> >> RequestChannel.scala:65)
> >>        at
> >> kafka.network.RequestChannel$Request$$anonfun$4.apply(
> >> RequestChannel.scala:71)
> >>        at
> >> kafka.network.RequestChannel$Request$$anonfun$4.apply(
> >> RequestChannel.scala:71)
> >>        at scala.Option.map(Option.scala:146)
> >>        at
> >> kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:71)
> >>        at
> >> kafka.network.Processor$$anonfun$processCompletedReceives$1.
> >> apply(SocketServer.scala:488)
> >>        at
> >> kafka.network.Processor$$anonfun$processCompletedReceives$1.
> >> apply(SocketServer.scala:483)
> >>        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> >>        at scala.collection.AbstractIterator.foreach(Iterator.scala:
> 1336)
> >>        at
> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >>        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >>        at
> >> kafka.network.Processor.processCompletedReceives(SocketServe
> r.scala:483)
> >>        at kafka.network.Processor.run(SocketServer.scala:413)
> >>        at java.lang.Thread.run(Thread.java:745)
> >>
>
>

Re: Kafka won't replicate from a specific broker

Posted by Jan Omar <ja...@gmail.com>.
Unfortunately I think you hit this bug:

https://issues.apache.org/jira/browse/KAFKA-4477 <https://issues.apache.org/jira/browse/KAFKA-4477>

The only option I know of is to reboot the affected broker. And upgrade to 0.10.1.1 as quickly as possible. We haven't seen this issue on 0.10.1.1.RC0.

Regards

Jan


> On 22 Dec 2016, at 18:16, Ismael Juma <is...@juma.me.uk> wrote:
> 
> Hi Valentin,
> 
> Is inter.broker.protocol.version set correctly in brokers 1 and 2? It
> should be 0.10.0 so that they can talk to the older broker without issue.
> 
> Ismael
> 
> On Thu, Dec 22, 2016 at 4:42 PM, Valentin Golev <va...@gdeslon.ru>
> wrote:
> 
>> Hello,
>> 
>> I have a three broker Kafka setup (the ids are 1, 2 (kafka 0.10.1.0) and
>> 1001 (kafka 0.10.0.0)). After a failure of two of them a lot of the
>> partitions have the third one (1001) as their leader. It's like this:
>> 
>>     Topic: userevents0.open Partition: 5    Leader: 1       Replicas:
>> 1,2,1001      Isr: 1,1001,2
>>        Topic: userevents0.open Partition: 6    Leader: 2       Replicas:
>> 2,1,1001      Isr: 1,2,1001
>>        Topic: userevents0.open Partition: 7    Leader: 1001    Replicas:
>> 1001,2,1      Isr: 1001
>>        Topic: userevents0.open Partition: 8    Leader: 1       Replicas:
>> 1,1001,2      Isr: 1,1001,2
>>        Topic: userevents0.open Partition: 9    Leader: 1001    Replicas:
>> 2,1001,1      Isr: 1001
>>        Topic: userevents0.open Partition: 10   Leader: 1001    Replicas:
>> 1001,1,2      Isr: 1001
>> 
>> As you can see, only the partitions with Leaders 1 or 2 have successfully
>> replicated. Brokers 1 and 2, however, are unable to fetch data from the
>> 1001.
>> 
>> All of the partitions are available to the consumers and producers. So
>> everything is fine except replication. 1001 is available from the other
>> servers.
>> 
>> I can't restart the broker 1001 because it seems that it will cause data
>> loss (as you can see, it's the only ISR on many partitions). Restarting the
>> other brokers didn't help at all. Neither did just plain waiting (it's the
>> third day of this going on). So what do I do?
>> 
>> The logs of the broker 2 (the one which tries to fetch data) are full of
>> this:
>> 
>> [2016-12-22 16:38:52,199] WARN [ReplicaFetcherThread-0-1001], Error in
>> fetch kafka.server.ReplicaFetcherThread$FetchRequest@117a49bf
>> (kafka.server.ReplicaFetcherThread)
>> java.io.IOException: Connection to 1001 was disconnected before the
>> response was read
>>        at
>> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$
>> extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115)
>>        at
>> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$
>> extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112)
>>        at scala.Option.foreach(Option.scala:257)
>>        at
>> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$
>> extension$1.apply(NetworkClientBlockingOps.scala:112)
>>        at
>> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$
>> extension$1.apply(NetworkClientBlockingOps.scala:108)
>>        at
>> kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
>> NetworkClientBlockingOps.scala:137)
>>        at
>> kafka.utils.NetworkClientBlockingOps$.kafka$utils$
>> NetworkClientBlockingOps$$pollContinuously$extension(
>> NetworkClientBlockingOps.scala:143)
>>        at
>> kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(
>> NetworkClientBlockingOps.scala:108)
>>        at
>> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:
>> 253)
>>        at
>> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
>>        at
>> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
>>        at
>> kafka.server.AbstractFetcherThread.processFetchRequest(
>> AbstractFetcherThread.scala:118)
>>        at
>> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
>>        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
>> 
>> The logs of the broker 1001 are full of this:
>> 
>> [2016-12-22 16:38:54,226] ERROR Processor got uncaught exception.
>> (kafka.network.Processor)
>> java.nio.BufferUnderflowException
>>        at java.nio.Buffer.nextGetIndex(Buffer.java:506)
>>        at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:361)
>>        at
>> kafka.api.FetchRequest$$anonfun$1$$anonfun$apply$1.
>> apply(FetchRequest.scala:55)
>>        at
>> kafka.api.FetchRequest$$anonfun$1$$anonfun$apply$1.
>> apply(FetchRequest.scala:52)
>>        at
>> scala.collection.TraversableLike$$anonfun$map$
>> 1.apply(TraversableLike.scala:234)
>>        at
>> scala.collection.TraversableLike$$anonfun$map$
>> 1.apply(TraversableLike.scala:234)
>>        at scala.collection.immutable.Range.foreach(Range.scala:160)
>>        at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>        at kafka.api.FetchRequest$$anonfun$1.apply(FetchRequest.scala:52)
>>        at kafka.api.FetchRequest$$anonfun$1.apply(FetchRequest.scala:49)
>>        at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
>> TraversableLike.scala:241)
>>        at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
>> TraversableLike.scala:241)
>>        at scala.collection.immutable.Range.foreach(Range.scala:160)
>>        at
>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>>        at
>> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>>        at kafka.api.FetchRequest$.readFrom(FetchRequest.scala:49)
>>        at
>> kafka.network.RequestChannel$Request$$anonfun$2.apply(
>> RequestChannel.scala:65)
>>        at
>> kafka.network.RequestChannel$Request$$anonfun$2.apply(
>> RequestChannel.scala:65)
>>        at
>> kafka.network.RequestChannel$Request$$anonfun$4.apply(
>> RequestChannel.scala:71)
>>        at
>> kafka.network.RequestChannel$Request$$anonfun$4.apply(
>> RequestChannel.scala:71)
>>        at scala.Option.map(Option.scala:146)
>>        at
>> kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:71)
>>        at
>> kafka.network.Processor$$anonfun$processCompletedReceives$1.
>> apply(SocketServer.scala:488)
>>        at
>> kafka.network.Processor$$anonfun$processCompletedReceives$1.
>> apply(SocketServer.scala:483)
>>        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>>        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>>        at
>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>        at
>> kafka.network.Processor.processCompletedReceives(SocketServer.scala:483)
>>        at kafka.network.Processor.run(SocketServer.scala:413)
>>        at java.lang.Thread.run(Thread.java:745)
>> 


Re: Kafka won't replicate from a specific broker

Posted by Ismael Juma <is...@juma.me.uk>.
Hi Valentin,

Is inter.broker.protocol.version set correctly in brokers 1 and 2? It
should be 0.10.0 so that they can talk to the older broker without issue.

Ismael

On Thu, Dec 22, 2016 at 4:42 PM, Valentin Golev <va...@gdeslon.ru>
wrote:

> Hello,
>
> I have a three broker Kafka setup (the ids are 1, 2 (kafka 0.10.1.0) and
> 1001 (kafka 0.10.0.0)). After a failure of two of them a lot of the
> partitions have the third one (1001) as their leader. It's like this:
>
>      Topic: userevents0.open Partition: 5    Leader: 1       Replicas:
> 1,2,1001      Isr: 1,1001,2
>         Topic: userevents0.open Partition: 6    Leader: 2       Replicas:
> 2,1,1001      Isr: 1,2,1001
>         Topic: userevents0.open Partition: 7    Leader: 1001    Replicas:
> 1001,2,1      Isr: 1001
>         Topic: userevents0.open Partition: 8    Leader: 1       Replicas:
> 1,1001,2      Isr: 1,1001,2
>         Topic: userevents0.open Partition: 9    Leader: 1001    Replicas:
> 2,1001,1      Isr: 1001
>         Topic: userevents0.open Partition: 10   Leader: 1001    Replicas:
> 1001,1,2      Isr: 1001
>
> As you can see, only the partitions with Leaders 1 or 2 have successfully
> replicated. Brokers 1 and 2, however, are unable to fetch data from the
> 1001.
>
> All of the partitions are available to the consumers and producers. So
> everything is fine except replication. 1001 is available from the other
> servers.
>
> I can't restart the broker 1001 because it seems that it will cause data
> loss (as you can see, it's the only ISR on many partitions). Restarting the
> other brokers didn't help at all. Neither did just plain waiting (it's the
> third day of this going on). So what do I do?
>
> The logs of the broker 2 (the one which tries to fetch data) are full of
> this:
>
> [2016-12-22 16:38:52,199] WARN [ReplicaFetcherThread-0-1001], Error in
> fetch kafka.server.ReplicaFetcherThread$FetchRequest@117a49bf
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 1001 was disconnected before the
> response was read
>         at
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$
> extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115)
>         at
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$
> extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112)
>         at scala.Option.foreach(Option.scala:257)
>         at
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$
> extension$1.apply(NetworkClientBlockingOps.scala:112)
>         at
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$
> extension$1.apply(NetworkClientBlockingOps.scala:108)
>         at
> kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:137)
>         at
> kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollContinuously$extension(
> NetworkClientBlockingOps.scala:143)
>         at
> kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(
> NetworkClientBlockingOps.scala:108)
>         at
> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:
> 253)
>         at
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
>         at
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
>         at
> kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:118)
>         at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
>
> The logs of the broker 1001 are full of this:
>
> [2016-12-22 16:38:54,226] ERROR Processor got uncaught exception.
> (kafka.network.Processor)
> java.nio.BufferUnderflowException
>         at java.nio.Buffer.nextGetIndex(Buffer.java:506)
>         at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:361)
>         at
> kafka.api.FetchRequest$$anonfun$1$$anonfun$apply$1.
> apply(FetchRequest.scala:55)
>         at
> kafka.api.FetchRequest$$anonfun$1$$anonfun$apply$1.
> apply(FetchRequest.scala:52)
>         at
> scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>         at
> scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>         at scala.collection.immutable.Range.foreach(Range.scala:160)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at kafka.api.FetchRequest$$anonfun$1.apply(FetchRequest.scala:52)
>         at kafka.api.FetchRequest$$anonfun$1.apply(FetchRequest.scala:49)
>         at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:241)
>         at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:241)
>         at scala.collection.immutable.Range.foreach(Range.scala:160)
>         at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>         at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>         at kafka.api.FetchRequest$.readFrom(FetchRequest.scala:49)
>         at
> kafka.network.RequestChannel$Request$$anonfun$2.apply(
> RequestChannel.scala:65)
>         at
> kafka.network.RequestChannel$Request$$anonfun$2.apply(
> RequestChannel.scala:65)
>         at
> kafka.network.RequestChannel$Request$$anonfun$4.apply(
> RequestChannel.scala:71)
>         at
> kafka.network.RequestChannel$Request$$anonfun$4.apply(
> RequestChannel.scala:71)
>         at scala.Option.map(Option.scala:146)
>         at
> kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:71)
>         at
> kafka.network.Processor$$anonfun$processCompletedReceives$1.
> apply(SocketServer.scala:488)
>         at
> kafka.network.Processor$$anonfun$processCompletedReceives$1.
> apply(SocketServer.scala:483)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at
> kafka.network.Processor.processCompletedReceives(SocketServer.scala:483)
>         at kafka.network.Processor.run(SocketServer.scala:413)
>         at java.lang.Thread.run(Thread.java:745)
>