You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Gerrit Jansen van Vuuren <ge...@gmail.com> on 2014/06/17 10:48:22 UTC
cannot replicate topics kafka inconsistent state
Hi,
I've installed kafka 2.8.1,
created a topic using:
/opt/kafka/bin/kafka-topics.sh --create --topic "test" --zookeeper
"localhost:2381" --partitions 2 --replication-factor 2
Then opened a console producer and a console consumer.
I type a few lines on the producer and then the two kafka brokers that
should have the two replicas start throwing errors to the logs, the only
way to get kafka back to normal again is by deleting all of the topic data
in kafka and in zookeeper and restarting.
The errors are:
broker1:
2014-06-17/01:40:32.137/PDT ERROR [kafka-processor-9092-5]:
kafka.network.Processor - Closing socket for /10.101.4.218 because of
error^C
kafka.common.KafkaException: This operation cannot be completed on a
complete request.
at kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
at kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)
at kafka.network.Processor.write(SocketServer.scala:375)
at kafka.network.Processor.run(SocketServer.scala:247)
at java.lang.Thread.run(Thread.java:744)
broker2
2014-06-17/01:40:29.127/PDT WARN [ReplicaFetcherThread-0-215]:
kafka.consumer.SimpleConsumer - Reconnect due to socket error: null
2014-06-17/01:40:29.127/PDT ERROR [ReplicaFetcherThread-0-215]:
kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-0-215], Error in
fetch Name: FetchRequest; Version: 0; CorrelationId: 545271; ClientId:
ReplicaFetcherThread-0-215; ReplicaId: 218; MaxWait: 1000 ms; MinBytes: 1
bytes; RequestInfo: [test,1] -> PartitionFetchInfo(1,2147483647)
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
at kafka.utils.Utils$.read(Utils.scala:376)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
The same error on each is repeated in an endless loop.
My config server config is:
----------------------------------------
num.network.threads=24
num.io.threads=24
socket.send.buffer.bytes=10485760
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=524288000
replica.lag.max.messages=5000000
replica.fetch.max.bytes=2097152
replica.fetch.wait.max.ms=1000
log.dir=/data
num.partitions=12
log.flush.interval.messages=200000
log.flush.interval.ms=2000
log.retention.hours=168
log.retention.mins=10080
log.retention.hours=168
log.retention.mins=10080
log.retention.bytes=2199023255552
replica.fetch.max.bytes=2147483647
log.segment.bytes=209715200
log.cleanup.interval.mins=10
default.replication.factor=2
zookeeper.connect=localhost:2381
zookeeper.connection.timeout.ms=1000000
----------
Am I missing some configuration properties?
Regards,
Gerrit
Re: cannot replicate topics kafka inconsistent state
Posted by Gerrit Jansen van Vuuren <ge...@gmail.com>.
The network is 10gig and so far has not given any issues I think its
extremely unlikely that it could be network (all ports are open and all
communication happens on an internal lan).
I'm running consumers and producers on the nodes where the brokers are
running and they are consuming and producing data at high volumes between
the nodes.
While doing the test I was not running any producers or consumers other
than the test kafka-console-producer and kafka-console-consumer.
On Tue, Jun 17, 2014 at 4:28 PM, Jun Rao <ju...@gmail.com> wrote:
> Is your network stable?
>
> Thanks,
>
> Jun
>
>
> On Tue, Jun 17, 2014 at 1:48 AM, Gerrit Jansen van Vuuren <
> gerritjvv@gmail.com> wrote:
>
> > Hi,
> >
> > I've installed kafka 2.8.1,
> > created a topic using:
> >
> > /opt/kafka/bin/kafka-topics.sh --create --topic "test" --zookeeper
> > "localhost:2381" --partitions 2 --replication-factor 2
> >
> > Then opened a console producer and a console consumer.
> > I type a few lines on the producer and then the two kafka brokers that
> > should have the two replicas start throwing errors to the logs, the only
> > way to get kafka back to normal again is by deleting all of the topic
> data
> > in kafka and in zookeeper and restarting.
> >
> > The errors are:
> > broker1:
> >
> > 2014-06-17/01:40:32.137/PDT ERROR [kafka-processor-9092-5]:
> > kafka.network.Processor - Closing socket for /10.101.4.218 because of
> > error^C
> >
> > kafka.common.KafkaException: This operation cannot be completed on a
> > complete request.
> >
> > at
> kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
> >
> > at kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)
> >
> > at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)
> >
> > at kafka.network.Processor.write(SocketServer.scala:375)
> >
> > at kafka.network.Processor.run(SocketServer.scala:247)
> >
> > at java.lang.Thread.run(Thread.java:744)
> >
> >
> > broker2
> >
> > 2014-06-17/01:40:29.127/PDT WARN [ReplicaFetcherThread-0-215]:
> > kafka.consumer.SimpleConsumer - Reconnect due to socket error: null
> >
> > 2014-06-17/01:40:29.127/PDT ERROR [ReplicaFetcherThread-0-215]:
> > kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-0-215], Error
> in
> > fetch Name: FetchRequest; Version: 0; CorrelationId: 545271; ClientId:
> > ReplicaFetcherThread-0-215; ReplicaId: 218; MaxWait: 1000 ms; MinBytes: 1
> > bytes; RequestInfo: [test,1] -> PartitionFetchInfo(1,2147483647)
> >
> > java.io.EOFException: Received -1 when reading from channel, socket has
> > likely been closed.
> >
> > at kafka.utils.Utils$.read(Utils.scala:376)
> >
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >
> > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >
> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> >
> > at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> >
> > at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >
> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >
> >
> > The same error on each is repeated in an endless loop.
> >
> >
> > My config server config is:
> >
> > ----------------------------------------
> >
> > num.network.threads=24
> >
> > num.io.threads=24
> >
> > socket.send.buffer.bytes=10485760
> >
> > socket.receive.buffer.bytes=1048576
> >
> > socket.request.max.bytes=524288000
> >
> > replica.lag.max.messages=5000000
> >
> > replica.fetch.max.bytes=2097152
> >
> > replica.fetch.wait.max.ms=1000
> >
> > log.dir=/data
> >
> > num.partitions=12
> >
> > log.flush.interval.messages=200000
> >
> > log.flush.interval.ms=2000
> >
> > log.retention.hours=168
> >
> > log.retention.mins=10080
> >
> > log.retention.hours=168
> >
> > log.retention.mins=10080
> >
> > log.retention.bytes=2199023255552
> >
> > replica.fetch.max.bytes=2147483647
> >
> > log.segment.bytes=209715200
> >
> > log.cleanup.interval.mins=10
> >
> > default.replication.factor=2
> >
> > zookeeper.connect=localhost:2381
> >
> > zookeeper.connection.timeout.ms=1000000
> >
> > ----------
> >
> >
> > Am I missing some configuration properties?
> >
> >
> > Regards,
> >
> > Gerrit
> >
>
Re: cannot replicate topics kafka inconsistent state
Posted by Jun Rao <ju...@gmail.com>.
Is your network stable?
Thanks,
Jun
On Tue, Jun 17, 2014 at 1:48 AM, Gerrit Jansen van Vuuren <
gerritjvv@gmail.com> wrote:
> Hi,
>
> I've installed kafka 2.8.1,
> created a topic using:
>
> /opt/kafka/bin/kafka-topics.sh --create --topic "test" --zookeeper
> "localhost:2381" --partitions 2 --replication-factor 2
>
> Then opened a console producer and a console consumer.
> I type a few lines on the producer and then the two kafka brokers that
> should have the two replicas start throwing errors to the logs, the only
> way to get kafka back to normal again is by deleting all of the topic data
> in kafka and in zookeeper and restarting.
>
> The errors are:
> broker1:
>
> 2014-06-17/01:40:32.137/PDT ERROR [kafka-processor-9092-5]:
> kafka.network.Processor - Closing socket for /10.101.4.218 because of
> error^C
>
> kafka.common.KafkaException: This operation cannot be completed on a
> complete request.
>
> at kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
>
> at kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)
>
> at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)
>
> at kafka.network.Processor.write(SocketServer.scala:375)
>
> at kafka.network.Processor.run(SocketServer.scala:247)
>
> at java.lang.Thread.run(Thread.java:744)
>
>
> broker2
>
> 2014-06-17/01:40:29.127/PDT WARN [ReplicaFetcherThread-0-215]:
> kafka.consumer.SimpleConsumer - Reconnect due to socket error: null
>
> 2014-06-17/01:40:29.127/PDT ERROR [ReplicaFetcherThread-0-215]:
> kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-0-215], Error in
> fetch Name: FetchRequest; Version: 0; CorrelationId: 545271; ClientId:
> ReplicaFetcherThread-0-215; ReplicaId: 218; MaxWait: 1000 ms; MinBytes: 1
> bytes; RequestInfo: [test,1] -> PartitionFetchInfo(1,2147483647)
>
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
>
> at kafka.utils.Utils$.read(Utils.scala:376)
>
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>
> at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>
> at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
>
> The same error on each is repeated in an endless loop.
>
>
> My config server config is:
>
> ----------------------------------------
>
> num.network.threads=24
>
> num.io.threads=24
>
> socket.send.buffer.bytes=10485760
>
> socket.receive.buffer.bytes=1048576
>
> socket.request.max.bytes=524288000
>
> replica.lag.max.messages=5000000
>
> replica.fetch.max.bytes=2097152
>
> replica.fetch.wait.max.ms=1000
>
> log.dir=/data
>
> num.partitions=12
>
> log.flush.interval.messages=200000
>
> log.flush.interval.ms=2000
>
> log.retention.hours=168
>
> log.retention.mins=10080
>
> log.retention.hours=168
>
> log.retention.mins=10080
>
> log.retention.bytes=2199023255552
>
> replica.fetch.max.bytes=2147483647
>
> log.segment.bytes=209715200
>
> log.cleanup.interval.mins=10
>
> default.replication.factor=2
>
> zookeeper.connect=localhost:2381
>
> zookeeper.connection.timeout.ms=1000000
>
> ----------
>
>
> Am I missing some configuration properties?
>
>
> Regards,
>
> Gerrit
>