You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Ahmed H." <ah...@gmail.com> on 2014/06/25 16:47:18 UTC

Kafka connection loss with high volume of messages

Hello All,

I am seeing this issue very frequently when running a high volume of
messages through Kafka. It starts off well, and it can go on for minutes
that way, but eventually it reaches a point where the connection to Kafka
dies, then it reconnects and carries on. This repeats more frequently when
I have been sending messages for a while. Basically, the more messages I
send, the more I see this. To give you an idea, a separate process is
writing about 500k messages to the queue. I see this issue on the consumer
side that is receiving those messages, after it has received about 60% of
the messages. Here is the stack trace:

10:36:14,238 WARN  [kafka.consumer.ConsumerFetcherThread]
(ConsumerFetcherThread-test.queue.default_localhost-1403704698110-71aecd8d-0-0)
[ConsumerFetcherThread-test.queue.default_localhost-1403704698110-71aecd8d-0-0],
Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1; ClientId:
test.queue.default-ConsumerFetcherThread-test.queue_localhost-1403704698110-71aecd8d-0-0;
ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
[test.queue.resync,0] -> PartitionFetchInfo(0,1048576),[test.queue,0] ->
PartitionFetchInfo(0,1048576): java.nio.channels.ClosedByInterruptException
 at
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
[rt.jar:1.7.0_25]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:650)
[rt.jar:1.7.0_25]
 at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:43)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:56)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:108)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:107)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:106)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]


Since it starts off working as it should, and only runs into this after
some time, I am inclined to believe that this maybe a memory/GC issue? Not
quite sure.

I hope I explained it properly. I am having trouble describing it.

Thanks

Re: Kafka connection loss with high volume of messages

Posted by Neha Narkhede <ne...@gmail.com>.
11:57:26,897 INFO  [org.I0Itec.zkclient.ZkClient]
(pool-5-thread-1-EventThread) zookeeper state changed (Disconnected)
11:57:26,897 INFO  [org.I0Itec.zkclient.ZkClient]
(clojure-agent-send-off-pool-
6-EventThread) zookeeper state changed
(Disconnected)

I wonder why your consumer disconnects from zookeeper. Do you also see a
session expiration? Have you checked your GC logs to see if your consumer
process pauses for long duration?


On Thu, Jun 26, 2014 at 9:00 AM, Ahmed H. <ah...@gmail.com> wrote:

> I dug some more and it seems like before these errors show up, I see a few
> Zookeeper warnings, followed by Kafka errors.
>
> 11:57:26,897 INFO  [org.I0Itec.zkclient.ZkClient]
> (pool-5-thread-1-EventThread) zookeeper state changed (Disconnected)
> 11:57:26,897 INFO  [org.I0Itec.zkclient.ZkClient]
> (clojure-agent-send-off-pool-6-EventThread) zookeeper state changed
> (Disconnected)
> 11:57:26,898 INFO  [kafka.consumer.SimpleConsumer]
>
> (ConsumerFetcherThread-test-queue.default_localhost-1403795897237-bf49b4a5-0-0)
> Reconnect due to socket error: :
> java.nio.channels.ClosedByInterruptException
>  at
>
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> [rt.jar:1.7.0_25]
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:402)
> [rt.jar:1.7.0_25]
>  at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:220)
> [rt.jar:1.7.0_25]
> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> [rt.jar:1.7.0_25]
>  at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
> [rt.jar:1.7.0_25]
> at kafka.utils.Utils$.read(Utils.scala:394)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:71)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:108)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:107)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:106)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>
>
> Again, this starts happening towards the end of my large message sending
> process.
>
> Thanks
>
>
> On Wed, Jun 25, 2014 at 6:02 PM, Neha Narkhede <ne...@gmail.com>
> wrote:
>
> > If rebalance succeeded, then those error messages are harmless. Though I
> > agree we shouldn't log those in the first place.
> >
> >
> > On Wed, Jun 25, 2014 at 2:12 PM, Ahmed H. <ah...@gmail.com>
> wrote:
> >
> > > Unfortunately I do not have the logs on hand anymore, they were cleared
> > > already.
> > >
> > > With that said, I do recall seeing some rebalancing. It attempts to
> > > rebalance a few times and eventually succeeds. In the past, I have had
> > > cases where it tries rebalancing 4 times and gives up because it
> reached
> > > it's limit. In this particular situation, it didn't totally fail.
> > >
> > >
> > > On Wed, Jun 25, 2014 at 2:44 PM, Neha Narkhede <
> neha.narkhede@gmail.com>
> > > wrote:
> > >
> > > > Do you see something like "begin rebalancing consumer" in your
> consumer
> > > > logs? Could you send around the full log4j of the consumer?
> > > >
> > > >
> > > > On Wed, Jun 25, 2014 at 8:19 AM, Ahmed H. <ah...@gmail.com>
> > > wrote:
> > > >
> > > > > Are you referring to the zookeeper logs? If so, I am seeing a lot
> of
> > > > those:
> > > > >
> > > > > 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream
> > exception
> > > > > EndOfStreamException: Unable to read additional data from client
> > > > sessionid
> > > > > 0x146958701700371, likely client has closed socket
> > > > >  at
> > > >
> org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> > > > >  at java.lang.Thread.run(Thread.java:724)
> > > > > 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream
> > exception
> > > > > EndOfStreamException: Unable to read additional data from client
> > > > sessionid
> > > > > 0x146958701700372, likely client has closed socket
> > > > >  at
> > > >
> org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> > > > >  at java.lang.Thread.run(Thread.java:724)
> > > > > 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream
> > exception
> > > > > EndOfStreamException: Unable to read additional data from client
> > > > sessionid
> > > > > 0x146958701700374, likely client has closed socket
> > > > >  at
> > > >
> org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> > > > >  at java.lang.Thread.run(Thread.java:724)
> > > > > 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream
> > exception
> > > > > EndOfStreamException: Unable to read additional data from client
> > > > sessionid
> > > > > 0x146958701700373, likely client has closed socket
> > > > >  at
> > > >
> org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> > > > >  at java.lang.Thread.run(Thread.java:724)
> > > > >
> > > > >
> > > > > On Wed, Jun 25, 2014 at 11:15 AM, Guozhang Wang <
> wangguoz@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hello Ahmed,
> > > > > >
> > > > > > Did you see any exceptions on the broker logs?
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Wed, Jun 25, 2014 at 7:47 AM, Ahmed H. <
> ahmed.hammad@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hello All,
> > > > > > >
> > > > > > > I am seeing this issue very frequently when running a high
> volume
> > > of
> > > > > > > messages through Kafka. It starts off well, and it can go on
> for
> > > > > minutes
> > > > > > > that way, but eventually it reaches a point where the
> connection
> > to
> > > > > Kafka
> > > > > > > dies, then it reconnects and carries on. This repeats more
> > > frequently
> > > > > > when
> > > > > > > I have been sending messages for a while. Basically, the more
> > > > messages
> > > > > I
> > > > > > > send, the more I see this. To give you an idea, a separate
> > process
> > > is
> > > > > > > writing about 500k messages to the queue. I see this issue on
> the
> > > > > > consumer
> > > > > > > side that is receiving those messages, after it has received
> > about
> > > > 60%
> > > > > of
> > > > > > > the messages. Here is the stack trace:
> > > > > > >
> > > > > > > 10:36:14,238 WARN  [kafka.consumer.ConsumerFetcherThread]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> (ConsumerFetcherThread-test.queue.default_localhost-1403704698110-71aecd8d-0-0)
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> [ConsumerFetcherThread-test.queue.default_localhost-1403704698110-71aecd8d-0-0],
> > > > > > > Error in fetch Name: FetchRequest; Version: 0; CorrelationId:
> 1;
> > > > > > ClientId:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> test.queue.default-ConsumerFetcherThread-test.queue_localhost-1403704698110-71aecd8d-0-0;
> > > > > > > ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> > > > > > > [test.queue.resync,0] ->
> > > PartitionFetchInfo(0,1048576),[test.queue,0]
> > > > > ->
> > > > > > > PartitionFetchInfo(0,1048576):
> > > > > > java.nio.channels.ClosedByInterruptException
> > > > > > >  at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> > > > > > > [rt.jar:1.7.0_25]
> > > > > > > at
> > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:650)
> > > > > > > [rt.jar:1.7.0_25]
> > > > > > >  at
> > kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > > at
> kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:43)
> > > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > >  at
> > > kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:56)
> > > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > > at
> > > > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77)
> > > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > >  at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> > > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > >  at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> > > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> > > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:107)
> > > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > >  at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> > > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> > > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > > at
> kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:106)
> > > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > >  at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > > at
> > > > > > >
> > > > >
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > >  at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > >
> > > > > > >
> > > > > > > Since it starts off working as it should, and only runs into
> this
> > > > after
> > > > > > > some time, I am inclined to believe that this maybe a memory/GC
> > > > issue?
> > > > > > Not
> > > > > > > quite sure.
> > > > > > >
> > > > > > > I hope I explained it properly. I am having trouble describing
> > it.
> > > > > > >
> > > > > > > Thanks
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Kafka connection loss with high volume of messages

Posted by "Ahmed H." <ah...@gmail.com>.
I dug some more and it seems like before these errors show up, I see a few
Zookeeper warnings, followed by Kafka errors.

11:57:26,897 INFO  [org.I0Itec.zkclient.ZkClient]
(pool-5-thread-1-EventThread) zookeeper state changed (Disconnected)
11:57:26,897 INFO  [org.I0Itec.zkclient.ZkClient]
(clojure-agent-send-off-pool-6-EventThread) zookeeper state changed
(Disconnected)
11:57:26,898 INFO  [kafka.consumer.SimpleConsumer]
(ConsumerFetcherThread-test-queue.default_localhost-1403795897237-bf49b4a5-0-0)
Reconnect due to socket error: :
java.nio.channels.ClosedByInterruptException
 at
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
[rt.jar:1.7.0_25]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:402)
[rt.jar:1.7.0_25]
 at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:220)
[rt.jar:1.7.0_25]
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
[rt.jar:1.7.0_25]
 at
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
[rt.jar:1.7.0_25]
at kafka.utils.Utils$.read(Utils.scala:394)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:71)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:108)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:107)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:106)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]


Again, this starts happening towards the end of my large message sending
process.

Thanks


On Wed, Jun 25, 2014 at 6:02 PM, Neha Narkhede <ne...@gmail.com>
wrote:

> If rebalance succeeded, then those error messages are harmless. Though I
> agree we shouldn't log those in the first place.
>
>
> On Wed, Jun 25, 2014 at 2:12 PM, Ahmed H. <ah...@gmail.com> wrote:
>
> > Unfortunately I do not have the logs on hand anymore, they were cleared
> > already.
> >
> > With that said, I do recall seeing some rebalancing. It attempts to
> > rebalance a few times and eventually succeeds. In the past, I have had
> > cases where it tries rebalancing 4 times and gives up because it reached
> > it's limit. In this particular situation, it didn't totally fail.
> >
> >
> > On Wed, Jun 25, 2014 at 2:44 PM, Neha Narkhede <ne...@gmail.com>
> > wrote:
> >
> > > Do you see something like "begin rebalancing consumer" in your consumer
> > > logs? Could you send around the full log4j of the consumer?
> > >
> > >
> > > On Wed, Jun 25, 2014 at 8:19 AM, Ahmed H. <ah...@gmail.com>
> > wrote:
> > >
> > > > Are you referring to the zookeeper logs? If so, I am seeing a lot of
> > > those:
> > > >
> > > > 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream
> exception
> > > > EndOfStreamException: Unable to read additional data from client
> > > sessionid
> > > > 0x146958701700371, likely client has closed socket
> > > >  at
> > > org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> > > >  at java.lang.Thread.run(Thread.java:724)
> > > > 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream
> exception
> > > > EndOfStreamException: Unable to read additional data from client
> > > sessionid
> > > > 0x146958701700372, likely client has closed socket
> > > >  at
> > > org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> > > >  at java.lang.Thread.run(Thread.java:724)
> > > > 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream
> exception
> > > > EndOfStreamException: Unable to read additional data from client
> > > sessionid
> > > > 0x146958701700374, likely client has closed socket
> > > >  at
> > > org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> > > >  at java.lang.Thread.run(Thread.java:724)
> > > > 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream
> exception
> > > > EndOfStreamException: Unable to read additional data from client
> > > sessionid
> > > > 0x146958701700373, likely client has closed socket
> > > >  at
> > > org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> > > >  at java.lang.Thread.run(Thread.java:724)
> > > >
> > > >
> > > > On Wed, Jun 25, 2014 at 11:15 AM, Guozhang Wang <wa...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hello Ahmed,
> > > > >
> > > > > Did you see any exceptions on the broker logs?
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Wed, Jun 25, 2014 at 7:47 AM, Ahmed H. <ah...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hello All,
> > > > > >
> > > > > > I am seeing this issue very frequently when running a high volume
> > of
> > > > > > messages through Kafka. It starts off well, and it can go on for
> > > > minutes
> > > > > > that way, but eventually it reaches a point where the connection
> to
> > > > Kafka
> > > > > > dies, then it reconnects and carries on. This repeats more
> > frequently
> > > > > when
> > > > > > I have been sending messages for a while. Basically, the more
> > > messages
> > > > I
> > > > > > send, the more I see this. To give you an idea, a separate
> process
> > is
> > > > > > writing about 500k messages to the queue. I see this issue on the
> > > > > consumer
> > > > > > side that is receiving those messages, after it has received
> about
> > > 60%
> > > > of
> > > > > > the messages. Here is the stack trace:
> > > > > >
> > > > > > 10:36:14,238 WARN  [kafka.consumer.ConsumerFetcherThread]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> (ConsumerFetcherThread-test.queue.default_localhost-1403704698110-71aecd8d-0-0)
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> [ConsumerFetcherThread-test.queue.default_localhost-1403704698110-71aecd8d-0-0],
> > > > > > Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1;
> > > > > ClientId:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> test.queue.default-ConsumerFetcherThread-test.queue_localhost-1403704698110-71aecd8d-0-0;
> > > > > > ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> > > > > > [test.queue.resync,0] ->
> > PartitionFetchInfo(0,1048576),[test.queue,0]
> > > > ->
> > > > > > PartitionFetchInfo(0,1048576):
> > > > > java.nio.channels.ClosedByInterruptException
> > > > > >  at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> > > > > > [rt.jar:1.7.0_25]
> > > > > > at
> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:650)
> > > > > > [rt.jar:1.7.0_25]
> > > > > >  at
> kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:43)
> > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > >  at
> > kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:56)
> > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > at
> > > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77)
> > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > >  at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > >  at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:107)
> > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > >  at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:106)
> > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > >  at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > > at
> > > > > >
> > > >
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > >  at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > >
> > > > > >
> > > > > > Since it starts off working as it should, and only runs into this
> > > after
> > > > > > some time, I am inclined to believe that this maybe a memory/GC
> > > issue?
> > > > > Not
> > > > > > quite sure.
> > > > > >
> > > > > > I hope I explained it properly. I am having trouble describing
> it.
> > > > > >
> > > > > > Thanks
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>

Re: Kafka connection loss with high volume of messages

Posted by Neha Narkhede <ne...@gmail.com>.
If rebalance succeeded, then those error messages are harmless. Though I
agree we shouldn't log those in the first place.


On Wed, Jun 25, 2014 at 2:12 PM, Ahmed H. <ah...@gmail.com> wrote:

> Unfortunately I do not have the logs on hand anymore, they were cleared
> already.
>
> With that said, I do recall seeing some rebalancing. It attempts to
> rebalance a few times and eventually succeeds. In the past, I have had
> cases where it tries rebalancing 4 times and gives up because it reached
> it's limit. In this particular situation, it didn't totally fail.
>
>
> On Wed, Jun 25, 2014 at 2:44 PM, Neha Narkhede <ne...@gmail.com>
> wrote:
>
> > Do you see something like "begin rebalancing consumer" in your consumer
> > logs? Could you send around the full log4j of the consumer?
> >
> >
> > On Wed, Jun 25, 2014 at 8:19 AM, Ahmed H. <ah...@gmail.com>
> wrote:
> >
> > > Are you referring to the zookeeper logs? If so, I am seeing a lot of
> > those:
> > >
> > > 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream exception
> > > EndOfStreamException: Unable to read additional data from client
> > sessionid
> > > 0x146958701700371, likely client has closed socket
> > >  at
> > org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> > > at
> > >
> > >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> > >  at java.lang.Thread.run(Thread.java:724)
> > > 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream exception
> > > EndOfStreamException: Unable to read additional data from client
> > sessionid
> > > 0x146958701700372, likely client has closed socket
> > >  at
> > org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> > > at
> > >
> > >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> > >  at java.lang.Thread.run(Thread.java:724)
> > > 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream exception
> > > EndOfStreamException: Unable to read additional data from client
> > sessionid
> > > 0x146958701700374, likely client has closed socket
> > >  at
> > org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> > > at
> > >
> > >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> > >  at java.lang.Thread.run(Thread.java:724)
> > > 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream exception
> > > EndOfStreamException: Unable to read additional data from client
> > sessionid
> > > 0x146958701700373, likely client has closed socket
> > >  at
> > org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> > > at
> > >
> > >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> > >  at java.lang.Thread.run(Thread.java:724)
> > >
> > >
> > > On Wed, Jun 25, 2014 at 11:15 AM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > >
> > > > Hello Ahmed,
> > > >
> > > > Did you see any exceptions on the broker logs?
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Wed, Jun 25, 2014 at 7:47 AM, Ahmed H. <ah...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello All,
> > > > >
> > > > > I am seeing this issue very frequently when running a high volume
> of
> > > > > messages through Kafka. It starts off well, and it can go on for
> > > minutes
> > > > > that way, but eventually it reaches a point where the connection to
> > > Kafka
> > > > > dies, then it reconnects and carries on. This repeats more
> frequently
> > > > when
> > > > > I have been sending messages for a while. Basically, the more
> > messages
> > > I
> > > > > send, the more I see this. To give you an idea, a separate process
> is
> > > > > writing about 500k messages to the queue. I see this issue on the
> > > > consumer
> > > > > side that is receiving those messages, after it has received about
> > 60%
> > > of
> > > > > the messages. Here is the stack trace:
> > > > >
> > > > > 10:36:14,238 WARN  [kafka.consumer.ConsumerFetcherThread]
> > > > >
> > > > >
> > > >
> > >
> >
> (ConsumerFetcherThread-test.queue.default_localhost-1403704698110-71aecd8d-0-0)
> > > > >
> > > > >
> > > >
> > >
> >
> [ConsumerFetcherThread-test.queue.default_localhost-1403704698110-71aecd8d-0-0],
> > > > > Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1;
> > > > ClientId:
> > > > >
> > > > >
> > > >
> > >
> >
> test.queue.default-ConsumerFetcherThread-test.queue_localhost-1403704698110-71aecd8d-0-0;
> > > > > ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> > > > > [test.queue.resync,0] ->
> PartitionFetchInfo(0,1048576),[test.queue,0]
> > > ->
> > > > > PartitionFetchInfo(0,1048576):
> > > > java.nio.channels.ClosedByInterruptException
> > > > >  at
> > > > >
> > > > >
> > > >
> > >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> > > > > [rt.jar:1.7.0_25]
> > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:650)
> > > > > [rt.jar:1.7.0_25]
> > > > >  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:43)
> > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > >  at
> kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:56)
> > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > at
> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77)
> > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > >  at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > >  at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:107)
> > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > >  at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:106)
> > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > >  at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > > at
> > > > >
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > >
> > > > >
> > > > > Since it starts off working as it should, and only runs into this
> > after
> > > > > some time, I am inclined to believe that this maybe a memory/GC
> > issue?
> > > > Not
> > > > > quite sure.
> > > > >
> > > > > I hope I explained it properly. I am having trouble describing it.
> > > > >
> > > > > Thanks
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>

Re: Kafka connection loss with high volume of messages

Posted by "Ahmed H." <ah...@gmail.com>.
Unfortunately I do not have the logs on hand anymore, they were cleared
already.

With that said, I do recall seeing some rebalancing. It attempts to
rebalance a few times and eventually succeeds. In the past, I have had
cases where it tries rebalancing 4 times and gives up because it reached
it's limit. In this particular situation, it didn't totally fail.


On Wed, Jun 25, 2014 at 2:44 PM, Neha Narkhede <ne...@gmail.com>
wrote:

> Do you see something like "begin rebalancing consumer" in your consumer
> logs? Could you send around the full log4j of the consumer?
>
>
> On Wed, Jun 25, 2014 at 8:19 AM, Ahmed H. <ah...@gmail.com> wrote:
>
> > Are you referring to the zookeeper logs? If so, I am seeing a lot of
> those:
> >
> > 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream exception
> > EndOfStreamException: Unable to read additional data from client
> sessionid
> > 0x146958701700371, likely client has closed socket
> >  at
> org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> > at
> >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> >  at java.lang.Thread.run(Thread.java:724)
> > 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream exception
> > EndOfStreamException: Unable to read additional data from client
> sessionid
> > 0x146958701700372, likely client has closed socket
> >  at
> org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> > at
> >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> >  at java.lang.Thread.run(Thread.java:724)
> > 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream exception
> > EndOfStreamException: Unable to read additional data from client
> sessionid
> > 0x146958701700374, likely client has closed socket
> >  at
> org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> > at
> >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> >  at java.lang.Thread.run(Thread.java:724)
> > 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream exception
> > EndOfStreamException: Unable to read additional data from client
> sessionid
> > 0x146958701700373, likely client has closed socket
> >  at
> org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> > at
> >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> >  at java.lang.Thread.run(Thread.java:724)
> >
> >
> > On Wed, Jun 25, 2014 at 11:15 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >
> > > Hello Ahmed,
> > >
> > > Did you see any exceptions on the broker logs?
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Jun 25, 2014 at 7:47 AM, Ahmed H. <ah...@gmail.com>
> > wrote:
> > >
> > > > Hello All,
> > > >
> > > > I am seeing this issue very frequently when running a high volume of
> > > > messages through Kafka. It starts off well, and it can go on for
> > minutes
> > > > that way, but eventually it reaches a point where the connection to
> > Kafka
> > > > dies, then it reconnects and carries on. This repeats more frequently
> > > when
> > > > I have been sending messages for a while. Basically, the more
> messages
> > I
> > > > send, the more I see this. To give you an idea, a separate process is
> > > > writing about 500k messages to the queue. I see this issue on the
> > > consumer
> > > > side that is receiving those messages, after it has received about
> 60%
> > of
> > > > the messages. Here is the stack trace:
> > > >
> > > > 10:36:14,238 WARN  [kafka.consumer.ConsumerFetcherThread]
> > > >
> > > >
> > >
> >
> (ConsumerFetcherThread-test.queue.default_localhost-1403704698110-71aecd8d-0-0)
> > > >
> > > >
> > >
> >
> [ConsumerFetcherThread-test.queue.default_localhost-1403704698110-71aecd8d-0-0],
> > > > Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1;
> > > ClientId:
> > > >
> > > >
> > >
> >
> test.queue.default-ConsumerFetcherThread-test.queue_localhost-1403704698110-71aecd8d-0-0;
> > > > ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> > > > [test.queue.resync,0] -> PartitionFetchInfo(0,1048576),[test.queue,0]
> > ->
> > > > PartitionFetchInfo(0,1048576):
> > > java.nio.channels.ClosedByInterruptException
> > > >  at
> > > >
> > > >
> > >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> > > > [rt.jar:1.7.0_25]
> > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:650)
> > > > [rt.jar:1.7.0_25]
> > > >  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:43)
> > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > >  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:56)
> > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77)
> > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:107)
> > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:106)
> > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > > at
> > > >
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > >
> > > >
> > > > Since it starts off working as it should, and only runs into this
> after
> > > > some time, I am inclined to believe that this maybe a memory/GC
> issue?
> > > Not
> > > > quite sure.
> > > >
> > > > I hope I explained it properly. I am having trouble describing it.
> > > >
> > > > Thanks
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Re: Kafka connection loss with high volume of messages

Posted by Neha Narkhede <ne...@gmail.com>.
Do you see something like "begin rebalancing consumer" in your consumer
logs? Could you send around the full log4j of the consumer?


On Wed, Jun 25, 2014 at 8:19 AM, Ahmed H. <ah...@gmail.com> wrote:

> Are you referring to the zookeeper logs? If so, I am seeing a lot of those:
>
> 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream exception
> EndOfStreamException: Unable to read additional data from client sessionid
> 0x146958701700371, likely client has closed socket
>  at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> at
>
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
>  at java.lang.Thread.run(Thread.java:724)
> 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream exception
> EndOfStreamException: Unable to read additional data from client sessionid
> 0x146958701700372, likely client has closed socket
>  at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> at
>
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
>  at java.lang.Thread.run(Thread.java:724)
> 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream exception
> EndOfStreamException: Unable to read additional data from client sessionid
> 0x146958701700374, likely client has closed socket
>  at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> at
>
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
>  at java.lang.Thread.run(Thread.java:724)
> 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream exception
> EndOfStreamException: Unable to read additional data from client sessionid
> 0x146958701700373, likely client has closed socket
>  at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> at
>
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
>  at java.lang.Thread.run(Thread.java:724)
>
>
> On Wed, Jun 25, 2014 at 11:15 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
> > Hello Ahmed,
> >
> > Did you see any exceptions on the broker logs?
> >
> > Guozhang
> >
> >
> > On Wed, Jun 25, 2014 at 7:47 AM, Ahmed H. <ah...@gmail.com>
> wrote:
> >
> > > Hello All,
> > >
> > > I am seeing this issue very frequently when running a high volume of
> > > messages through Kafka. It starts off well, and it can go on for
> minutes
> > > that way, but eventually it reaches a point where the connection to
> Kafka
> > > dies, then it reconnects and carries on. This repeats more frequently
> > when
> > > I have been sending messages for a while. Basically, the more messages
> I
> > > send, the more I see this. To give you an idea, a separate process is
> > > writing about 500k messages to the queue. I see this issue on the
> > consumer
> > > side that is receiving those messages, after it has received about 60%
> of
> > > the messages. Here is the stack trace:
> > >
> > > 10:36:14,238 WARN  [kafka.consumer.ConsumerFetcherThread]
> > >
> > >
> >
> (ConsumerFetcherThread-test.queue.default_localhost-1403704698110-71aecd8d-0-0)
> > >
> > >
> >
> [ConsumerFetcherThread-test.queue.default_localhost-1403704698110-71aecd8d-0-0],
> > > Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1;
> > ClientId:
> > >
> > >
> >
> test.queue.default-ConsumerFetcherThread-test.queue_localhost-1403704698110-71aecd8d-0-0;
> > > ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> > > [test.queue.resync,0] -> PartitionFetchInfo(0,1048576),[test.queue,0]
> ->
> > > PartitionFetchInfo(0,1048576):
> > java.nio.channels.ClosedByInterruptException
> > >  at
> > >
> > >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> > > [rt.jar:1.7.0_25]
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:650)
> > > [rt.jar:1.7.0_25]
> > >  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:43)
> > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > >  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:56)
> > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77)
> > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:107)
> > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:106)
> > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > >  at
> > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > > at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > >
> > >
> > > Since it starts off working as it should, and only runs into this after
> > > some time, I am inclined to believe that this maybe a memory/GC issue?
> > Not
> > > quite sure.
> > >
> > > I hope I explained it properly. I am having trouble describing it.
> > >
> > > Thanks
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: Kafka connection loss with high volume of messages

Posted by "Ahmed H." <ah...@gmail.com>.
Are you referring to the zookeeper logs? If so, I am seeing a lot of those:

2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream exception
EndOfStreamException: Unable to read additional data from client sessionid
0x146958701700371, likely client has closed socket
 at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
at
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
 at java.lang.Thread.run(Thread.java:724)
2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream exception
EndOfStreamException: Unable to read additional data from client sessionid
0x146958701700372, likely client has closed socket
 at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
at
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
 at java.lang.Thread.run(Thread.java:724)
2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream exception
EndOfStreamException: Unable to read additional data from client sessionid
0x146958701700374, likely client has closed socket
 at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
at
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
 at java.lang.Thread.run(Thread.java:724)
2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream exception
EndOfStreamException: Unable to read additional data from client sessionid
0x146958701700373, likely client has closed socket
 at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
at
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
 at java.lang.Thread.run(Thread.java:724)


On Wed, Jun 25, 2014 at 11:15 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello Ahmed,
>
> Did you see any exceptions on the broker logs?
>
> Guozhang
>
>
> On Wed, Jun 25, 2014 at 7:47 AM, Ahmed H. <ah...@gmail.com> wrote:
>
> > Hello All,
> >
> > I am seeing this issue very frequently when running a high volume of
> > messages through Kafka. It starts off well, and it can go on for minutes
> > that way, but eventually it reaches a point where the connection to Kafka
> > dies, then it reconnects and carries on. This repeats more frequently
> when
> > I have been sending messages for a while. Basically, the more messages I
> > send, the more I see this. To give you an idea, a separate process is
> > writing about 500k messages to the queue. I see this issue on the
> consumer
> > side that is receiving those messages, after it has received about 60% of
> > the messages. Here is the stack trace:
> >
> > 10:36:14,238 WARN  [kafka.consumer.ConsumerFetcherThread]
> >
> >
> (ConsumerFetcherThread-test.queue.default_localhost-1403704698110-71aecd8d-0-0)
> >
> >
> [ConsumerFetcherThread-test.queue.default_localhost-1403704698110-71aecd8d-0-0],
> > Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1;
> ClientId:
> >
> >
> test.queue.default-ConsumerFetcherThread-test.queue_localhost-1403704698110-71aecd8d-0-0;
> > ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> > [test.queue.resync,0] -> PartitionFetchInfo(0,1048576),[test.queue,0] ->
> > PartitionFetchInfo(0,1048576):
> java.nio.channels.ClosedByInterruptException
> >  at
> >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> > [rt.jar:1.7.0_25]
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:650)
> > [rt.jar:1.7.0_25]
> >  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:43)
> > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> >  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:56)
> > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77)
> > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> >  at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:107)
> > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:106)
> > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> >  at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> > at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> >
> >
> > Since it starts off working as it should, and only runs into this after
> > some time, I am inclined to believe that this maybe a memory/GC issue?
> Not
> > quite sure.
> >
> > I hope I explained it properly. I am having trouble describing it.
> >
> > Thanks
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka connection loss with high volume of messages

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Ahmed,

Did you see any exceptions on the broker logs?

Guozhang


On Wed, Jun 25, 2014 at 7:47 AM, Ahmed H. <ah...@gmail.com> wrote:

> Hello All,
>
> I am seeing this issue very frequently when running a high volume of
> messages through Kafka. It starts off well, and it can go on for minutes
> that way, but eventually it reaches a point where the connection to Kafka
> dies, then it reconnects and carries on. This repeats more frequently when
> I have been sending messages for a while. Basically, the more messages I
> send, the more I see this. To give you an idea, a separate process is
> writing about 500k messages to the queue. I see this issue on the consumer
> side that is receiving those messages, after it has received about 60% of
> the messages. Here is the stack trace:
>
> 10:36:14,238 WARN  [kafka.consumer.ConsumerFetcherThread]
>
> (ConsumerFetcherThread-test.queue.default_localhost-1403704698110-71aecd8d-0-0)
>
> [ConsumerFetcherThread-test.queue.default_localhost-1403704698110-71aecd8d-0-0],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1; ClientId:
>
> test.queue.default-ConsumerFetcherThread-test.queue_localhost-1403704698110-71aecd8d-0-0;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> [test.queue.resync,0] -> PartitionFetchInfo(0,1048576),[test.queue,0] ->
> PartitionFetchInfo(0,1048576): java.nio.channels.ClosedByInterruptException
>  at
>
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> [rt.jar:1.7.0_25]
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:650)
> [rt.jar:1.7.0_25]
>  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:43)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:56)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:108)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:107)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:106)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>
>
> Since it starts off working as it should, and only runs into this after
> some time, I am inclined to believe that this maybe a memory/GC issue? Not
> quite sure.
>
> I hope I explained it properly. I am having trouble describing it.
>
> Thanks
>



-- 
-- Guozhang