You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Gerrit Jansen van Vuuren <ge...@gmail.com> on 2014/06/17 10:48:22 UTC

cannot replicate topics kafka inconsistent state

Hi,

I've installed kafka 2.8.1,
created a topic using:

/opt/kafka/bin/kafka-topics.sh --create --topic "test" --zookeeper
"localhost:2381" --partitions 2 --replication-factor 2

Then opened a console producer and a console consumer.
I type a few lines on the producer and then the two kafka brokers that
should have the two replicas start throwing errors to the logs, the only
way to get kafka back to normal again is by deleting all of the topic data
in kafka and in zookeeper and restarting.

The errors are:
broker1:

2014-06-17/01:40:32.137/PDT ERROR [kafka-processor-9092-5]:
kafka.network.Processor - Closing socket for /10.101.4.218 because of
error^C

kafka.common.KafkaException: This operation cannot be completed on a
complete request.

at kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)

at kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)

at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)

at kafka.network.Processor.write(SocketServer.scala:375)

at kafka.network.Processor.run(SocketServer.scala:247)

at java.lang.Thread.run(Thread.java:744)


broker2

2014-06-17/01:40:29.127/PDT WARN  [ReplicaFetcherThread-0-215]:
kafka.consumer.SimpleConsumer - Reconnect due to socket error: null

2014-06-17/01:40:29.127/PDT ERROR [ReplicaFetcherThread-0-215]:
kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-0-215], Error in
fetch Name: FetchRequest; Version: 0; CorrelationId: 545271; ClientId:
ReplicaFetcherThread-0-215; ReplicaId: 218; MaxWait: 1000 ms; MinBytes: 1
bytes; RequestInfo: [test,1] -> PartitionFetchInfo(1,2147483647)

java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.

at kafka.utils.Utils$.read(Utils.scala:376)

at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)

at kafka.network.Receive$class.readCompletely(Transmission.scala:56)

at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)

at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)

at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)

at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)

at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)

at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)

at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)

at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)

at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)

at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)

at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)

at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)

at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)

at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)

at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)

 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)


The same error on each is repeated in an endless loop.


My config server config is:

----------------------------------------

num.network.threads=24

num.io.threads=24

socket.send.buffer.bytes=10485760

socket.receive.buffer.bytes=1048576

socket.request.max.bytes=524288000

replica.lag.max.messages=5000000

replica.fetch.max.bytes=2097152

replica.fetch.wait.max.ms=1000

log.dir=/data

num.partitions=12

log.flush.interval.messages=200000

log.flush.interval.ms=2000

log.retention.hours=168

log.retention.mins=10080

log.retention.hours=168

log.retention.mins=10080

log.retention.bytes=2199023255552

replica.fetch.max.bytes=2147483647

log.segment.bytes=209715200

log.cleanup.interval.mins=10

default.replication.factor=2

zookeeper.connect=localhost:2381

zookeeper.connection.timeout.ms=1000000

----------


Am I missing some configuration properties?


Regards,

 Gerrit

Re: cannot replicate topics kafka inconsistent state

Posted by Gerrit Jansen van Vuuren <ge...@gmail.com>.
The network is 10gig and so far has not given any issues I think its
extremely unlikely that it could be network (all ports are open and all
communication happens on an internal lan).

I'm running consumers and producers on the nodes where the brokers are
running and they are consuming and producing data at high volumes between
the nodes.
While doing the test I was not running any producers or consumers other
than the test kafka-console-producer and kafka-console-consumer.




On Tue, Jun 17, 2014 at 4:28 PM, Jun Rao <ju...@gmail.com> wrote:

> Is your network stable?
>
> Thanks,
>
> Jun
>
>
> On Tue, Jun 17, 2014 at 1:48 AM, Gerrit Jansen van Vuuren <
> gerritjvv@gmail.com> wrote:
>
> > Hi,
> >
> > I've installed kafka 2.8.1,
> > created a topic using:
> >
> > /opt/kafka/bin/kafka-topics.sh --create --topic "test" --zookeeper
> > "localhost:2381" --partitions 2 --replication-factor 2
> >
> > Then opened a console producer and a console consumer.
> > I type a few lines on the producer and then the two kafka brokers that
> > should have the two replicas start throwing errors to the logs, the only
> > way to get kafka back to normal again is by deleting all of the topic
> data
> > in kafka and in zookeeper and restarting.
> >
> > The errors are:
> > broker1:
> >
> > 2014-06-17/01:40:32.137/PDT ERROR [kafka-processor-9092-5]:
> > kafka.network.Processor - Closing socket for /10.101.4.218 because of
> > error^C
> >
> > kafka.common.KafkaException: This operation cannot be completed on a
> > complete request.
> >
> > at
> kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
> >
> > at kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)
> >
> > at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)
> >
> > at kafka.network.Processor.write(SocketServer.scala:375)
> >
> > at kafka.network.Processor.run(SocketServer.scala:247)
> >
> > at java.lang.Thread.run(Thread.java:744)
> >
> >
> > broker2
> >
> > 2014-06-17/01:40:29.127/PDT WARN  [ReplicaFetcherThread-0-215]:
> > kafka.consumer.SimpleConsumer - Reconnect due to socket error: null
> >
> > 2014-06-17/01:40:29.127/PDT ERROR [ReplicaFetcherThread-0-215]:
> > kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-0-215], Error
> in
> > fetch Name: FetchRequest; Version: 0; CorrelationId: 545271; ClientId:
> > ReplicaFetcherThread-0-215; ReplicaId: 218; MaxWait: 1000 ms; MinBytes: 1
> > bytes; RequestInfo: [test,1] -> PartitionFetchInfo(1,2147483647)
> >
> > java.io.EOFException: Received -1 when reading from channel, socket has
> > likely been closed.
> >
> > at kafka.utils.Utils$.read(Utils.scala:376)
> >
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >
> > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >
> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> >
> > at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> >
> > at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >
> >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >
> >
> > The same error on each is repeated in an endless loop.
> >
> >
> > My config server config is:
> >
> > ----------------------------------------
> >
> > num.network.threads=24
> >
> > num.io.threads=24
> >
> > socket.send.buffer.bytes=10485760
> >
> > socket.receive.buffer.bytes=1048576
> >
> > socket.request.max.bytes=524288000
> >
> > replica.lag.max.messages=5000000
> >
> > replica.fetch.max.bytes=2097152
> >
> > replica.fetch.wait.max.ms=1000
> >
> > log.dir=/data
> >
> > num.partitions=12
> >
> > log.flush.interval.messages=200000
> >
> > log.flush.interval.ms=2000
> >
> > log.retention.hours=168
> >
> > log.retention.mins=10080
> >
> > log.retention.hours=168
> >
> > log.retention.mins=10080
> >
> > log.retention.bytes=2199023255552
> >
> > replica.fetch.max.bytes=2147483647
> >
> > log.segment.bytes=209715200
> >
> > log.cleanup.interval.mins=10
> >
> > default.replication.factor=2
> >
> > zookeeper.connect=localhost:2381
> >
> > zookeeper.connection.timeout.ms=1000000
> >
> > ----------
> >
> >
> > Am I missing some configuration properties?
> >
> >
> > Regards,
> >
> >  Gerrit
> >
>

Re: cannot replicate topics kafka inconsistent state

Posted by Jun Rao <ju...@gmail.com>.
Is your network stable?

Thanks,

Jun


On Tue, Jun 17, 2014 at 1:48 AM, Gerrit Jansen van Vuuren <
gerritjvv@gmail.com> wrote:

> Hi,
>
> I've installed kafka 2.8.1,
> created a topic using:
>
> /opt/kafka/bin/kafka-topics.sh --create --topic "test" --zookeeper
> "localhost:2381" --partitions 2 --replication-factor 2
>
> Then opened a console producer and a console consumer.
> I type a few lines on the producer and then the two kafka brokers that
> should have the two replicas start throwing errors to the logs, the only
> way to get kafka back to normal again is by deleting all of the topic data
> in kafka and in zookeeper and restarting.
>
> The errors are:
> broker1:
>
> 2014-06-17/01:40:32.137/PDT ERROR [kafka-processor-9092-5]:
> kafka.network.Processor - Closing socket for /10.101.4.218 because of
> error^C
>
> kafka.common.KafkaException: This operation cannot be completed on a
> complete request.
>
> at kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
>
> at kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)
>
> at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)
>
> at kafka.network.Processor.write(SocketServer.scala:375)
>
> at kafka.network.Processor.run(SocketServer.scala:247)
>
> at java.lang.Thread.run(Thread.java:744)
>
>
> broker2
>
> 2014-06-17/01:40:29.127/PDT WARN  [ReplicaFetcherThread-0-215]:
> kafka.consumer.SimpleConsumer - Reconnect due to socket error: null
>
> 2014-06-17/01:40:29.127/PDT ERROR [ReplicaFetcherThread-0-215]:
> kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-0-215], Error in
> fetch Name: FetchRequest; Version: 0; CorrelationId: 545271; ClientId:
> ReplicaFetcherThread-0-215; ReplicaId: 218; MaxWait: 1000 ms; MinBytes: 1
> bytes; RequestInfo: [test,1] -> PartitionFetchInfo(1,2147483647)
>
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
>
> at kafka.utils.Utils$.read(Utils.scala:376)
>
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>
> at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>
> at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
>
> The same error on each is repeated in an endless loop.
>
>
> My config server config is:
>
> ----------------------------------------
>
> num.network.threads=24
>
> num.io.threads=24
>
> socket.send.buffer.bytes=10485760
>
> socket.receive.buffer.bytes=1048576
>
> socket.request.max.bytes=524288000
>
> replica.lag.max.messages=5000000
>
> replica.fetch.max.bytes=2097152
>
> replica.fetch.wait.max.ms=1000
>
> log.dir=/data
>
> num.partitions=12
>
> log.flush.interval.messages=200000
>
> log.flush.interval.ms=2000
>
> log.retention.hours=168
>
> log.retention.mins=10080
>
> log.retention.hours=168
>
> log.retention.mins=10080
>
> log.retention.bytes=2199023255552
>
> replica.fetch.max.bytes=2147483647
>
> log.segment.bytes=209715200
>
> log.cleanup.interval.mins=10
>
> default.replication.factor=2
>
> zookeeper.connect=localhost:2381
>
> zookeeper.connection.timeout.ms=1000000
>
> ----------
>
>
> Am I missing some configuration properties?
>
>
> Regards,
>
>  Gerrit
>