You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Arjun <ar...@socialtwist.com> on 2014/02/25 11:15:15 UTC

Delay in fetching messages - High level consumer -Kafka 0.8

Hi,

I am using kafka 0.8. I have 3 brokers on three systems and 3 zookeepers 
running.
I am using the high level consumer which is in examples folder of kafka. 
I am able to push the messages into the queue, but retriving the 
messages is taking some time. Is there any way i can tune this.

I get this info in the kafka console, does the consumer slowness because 
of this??
Reconnect due to socket error: null

The producer is pushing the messages as i can see that using Consumer 
offset checker tool.I can also see there is a lag in the consumer 
messages in this.


Thanks
Arjun Narasimha Kota

Re: Delay in fetching messages - High level consumer -Kafka 0.8

Posted by Arjun <ar...@socialtwist.com>.
Hi

I will make the change and see whether things work fine or not and let 
you know.

Thanks
Arjun Narasimha Kota
On Tuesday 25 February 2014 09:58 PM, Jun Rao wrote:
> The following config is probably what's causing the socket timeout. Try sth
> like 1000ms.
>
> MaxWait: 10000000 ms
>
> Thanks,
>
> Jun
>
>
> On Tue, Feb 25, 2014 at 2:16 AM, Arjun <ar...@socialtwist.com> wrote:
>
>> Apart from that i get this stack trace
>>
>> 25 Feb 2014 15:45:22,636 WARN [ConsumerFetcherThread-group1_
>> www.taf-dev.com-1393322165136-8318b07d-0-0] [kafka.consumer.ConsumerFetcherThread]
>> [ConsumerFetcherThread-group1_www.taf-dev.com-1393322165136-8318b07d-0-0],
>> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 32; ClientId:
>> group1-ConsumerFetcherThread-group1_www.taf-dev.com-1393322165136-8318b07d-0-0;
>> ReplicaId: -1; MaxWait: 10000000 ms; MinBytes: 1 bytes; RequestInfo:
>> [taf.referral.emails.service,8] -> PartitionFetchInfo(1702,1048576)
>> java.net.SocketTimeoutException
>>      at sun.nio.ch.SocketAdaptor$SocketInputStream.read(
>> SocketAdaptor.java:201)
>>      at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
>>      at java.nio.channels.Channels$ReadableByteChannelImpl.read(
>> Channels.java:221)
>>      at kafka.utils.Utils$.read(Utils.scala:395)
>>      at kafka.network.BoundedByteBufferReceive.readFrom(
>> BoundedByteBufferReceive.scala:54)
>>      at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>      at kafka.network.BoundedByteBufferReceive.readCompletely(
>> BoundedByteBufferReceive.scala:29)
>>      at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>>      at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.
>> scala:81)
>>      at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
>> $sendRequest(SimpleConsumer.scala:71)
>>      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
>>      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>>      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>>      at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
>> SimpleConsumer.scala:109)
>>      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> SimpleConsumer.scala:109)
>>      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> SimpleConsumer.scala:109)
>>      at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>      at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
>>      at kafka.server.AbstractFetcherThread.processFetchRequest(
>> AbstractFetcherThread.scala:94)
>>      at kafka.server.AbstractFetcherThread.doWork(
>> AbstractFetcherThread.scala:86)
>>      at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>>
>>
>> Does it effect anything. I havent looked at it as it was just a warning.
>> Should i be worried about this?
>>
>>
>> Thanks
>> Arjun Narasimha kota
>>
>>
>> On Tuesday 25 February 2014 03:45 PM, Arjun wrote:
>>
>>> Hi,
>>>
>>> I am using kafka 0.8. I have 3 brokers on three systems and 3 zookeepers
>>> running.
>>> I am using the high level consumer which is in examples folder of kafka.
>>> I am able to push the messages into the queue, but retriving the messages
>>> is taking some time. Is there any way i can tune this.
>>>
>>> I get this info in the kafka console, does the consumer slowness because
>>> of this??
>>> Reconnect due to socket error: null
>>>
>>> The producer is pushing the messages as i can see that using Consumer
>>> offset checker tool.I can also see there is a lag in the consumer messages
>>> in this.
>>>
>>>
>>> Thanks
>>> Arjun Narasimha Kota
>>>
>>


Re: Delay in fetching messages - High level consumer -Kafka 0.8

Posted by Jun Rao <ju...@gmail.com>.
The following config is probably what's causing the socket timeout. Try sth
like 1000ms.

MaxWait: 10000000 ms

Thanks,

Jun


On Tue, Feb 25, 2014 at 2:16 AM, Arjun <ar...@socialtwist.com> wrote:

> Apart from that i get this stack trace
>
> 25 Feb 2014 15:45:22,636 WARN [ConsumerFetcherThread-group1_
> www.taf-dev.com-1393322165136-8318b07d-0-0] [kafka.consumer.ConsumerFetcherThread]
> [ConsumerFetcherThread-group1_www.taf-dev.com-1393322165136-8318b07d-0-0],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 32; ClientId:
> group1-ConsumerFetcherThread-group1_www.taf-dev.com-1393322165136-8318b07d-0-0;
> ReplicaId: -1; MaxWait: 10000000 ms; MinBytes: 1 bytes; RequestInfo:
> [taf.referral.emails.service,8] -> PartitionFetchInfo(1702,1048576)
> java.net.SocketTimeoutException
>     at sun.nio.ch.SocketAdaptor$SocketInputStream.read(
> SocketAdaptor.java:201)
>     at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
>     at java.nio.channels.Channels$ReadableByteChannelImpl.read(
> Channels.java:221)
>     at kafka.utils.Utils$.read(Utils.scala:395)
>     at kafka.network.BoundedByteBufferReceive.readFrom(
> BoundedByteBufferReceive.scala:54)
>     at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>     at kafka.network.BoundedByteBufferReceive.readCompletely(
> BoundedByteBufferReceive.scala:29)
>     at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>     at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.
> scala:81)
>     at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
> $sendRequest(SimpleConsumer.scala:71)
>     at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
>     at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>     at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>     at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>     at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
> SimpleConsumer.scala:109)
>     at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> SimpleConsumer.scala:109)
>     at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> SimpleConsumer.scala:109)
>     at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>     at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
>     at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:94)
>     at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:86)
>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
>
> Does it effect anything. I havent looked at it as it was just a warning.
> Should i be worried about this?
>
>
> Thanks
> Arjun Narasimha kota
>
>
> On Tuesday 25 February 2014 03:45 PM, Arjun wrote:
>
>> Hi,
>>
>> I am using kafka 0.8. I have 3 brokers on three systems and 3 zookeepers
>> running.
>> I am using the high level consumer which is in examples folder of kafka.
>> I am able to push the messages into the queue, but retriving the messages
>> is taking some time. Is there any way i can tune this.
>>
>> I get this info in the kafka console, does the consumer slowness because
>> of this??
>> Reconnect due to socket error: null
>>
>> The producer is pushing the messages as i can see that using Consumer
>> offset checker tool.I can also see there is a lag in the consumer messages
>> in this.
>>
>>
>> Thanks
>> Arjun Narasimha Kota
>>
>
>

Re: Delay in fetching messages - High level consumer -Kafka 0.8

Posted by Arjun <ar...@socialtwist.com>.
Hi,

As i have mentioned in the first message, I have checked the log and 
offset using the Consumer off set checker tool. The Consumer offset just 
stalls.
And there is a lag. I haven't specified any fetch size in the consumer 
so i guess there is a default size of 1MB. All my messages are less than 
that. My Consumer code is not dead because i am continuously monitoring. 
I have  a doubt, the set up has 12 partitions and there are only 3 
consumers. Will that affect fetching time by anyway.  I will try to 
reduce the number of partitions and check once again.

Thanks for the inputs

thanks
Arjun Narasimha Kota

On Tuesday 25 February 2014 07:42 PM, Neha Narkhede wrote:
> Arjun,
>
> Have you looked at
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped,why
> ?
>
> Thanks,
> Neha
>
>
> On Tue, Feb 25, 2014 at 5:04 AM, Arjun <ar...@socialtwist.com> wrote:
>
>> The thing i found is my ConsumerFetcherThreads are not going beyond
>> BoundedByteBufferReceive.readFrom. When i added a few more traces in that
>> function i found that the call is stalling after exceptIncomplete function.
>> I guess Utils.read is stalling for more than 30 sec, which is the socket
>> time out time.
>> Is there anyway i can increase the socket timeout over there. I am not
>> getting why the consumers are getting stuck over there.
>>
>> There are no error on the brokers.
>>
>> Thanks
>> Arjun Narasimha Kota
>>
>>
>>
>> On Tuesday 25 February 2014 05:45 PM, Arjun wrote:
>>
>>> Adding to this, i have started my logs in trace mode. I fount that the
>>> Consumer fetcher threads are sending the meta data but are not receiving
>>> any.
>>> I see all the
>>> "TRACE [ConsumerFetcherThread-group1_www.taf-dev.com-1393329622308-6e15dd12-0-1]
>>> [kafka.network.BoundedByteBufferSend] 205 bytes written"
>>> but no reading is taking place. I may be looking at some thing wrong.
>>>
>>> Can some one please help me out in this.
>>>
>>> Thanks
>>> Arjun Narasimha kota
>>>
>>>
>>> On Tuesday 25 February 2014 03:46 PM, Arjun wrote:
>>>
>>>> Apart from that i get this stack trace
>>>>
>>>> 25 Feb 2014 15:45:22,636 WARN [ConsumerFetcherThread-group1_
>>>> www.taf-dev.com-1393322165136-8318b07d-0-0] [kafka.consumer.ConsumerFetcherThread]
>>>> [ConsumerFetcherThread-group1_www.taf-dev.com-1393322165136-8318b07d-0-0],
>>>> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 32; ClientId:
>>>> group1-ConsumerFetcherThread-group1_www.taf-dev.com-1393322165136-8318b07d-0-0;
>>>> ReplicaId: -1; MaxWait: 10000000 ms; MinBytes: 1 bytes; RequestInfo:
>>>> [taf.referral.emails.service,8] -> PartitionFetchInfo(1702,1048576)
>>>> java.net.SocketTimeoutException
>>>>      at sun.nio.ch.SocketAdaptor$SocketInputStream.read(
>>>> SocketAdaptor.java:201)
>>>>      at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
>>>>      at java.nio.channels.Channels$ReadableByteChannelImpl.read(
>>>> Channels.java:221)
>>>>      at kafka.utils.Utils$.read(Utils.scala:395)
>>>>      at kafka.network.BoundedByteBufferReceive.readFrom(
>>>> BoundedByteBufferReceive.scala:54)
>>>>      at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>>      at kafka.network.BoundedByteBufferReceive.readCompletely(
>>>> BoundedByteBufferReceive.scala:29)
>>>>      at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>>>>      at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.
>>>> scala:81)
>>>>      at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
>>>> $sendRequest(SimpleConsumer.scala:71)
>>>>      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>>>> apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
>>>>      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>>>> apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>>>>      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>>>> apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>>>>      at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
>>>> SimpleConsumer.scala:109)
>>>>      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>>>> SimpleConsumer.scala:109)
>>>>      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>>>> SimpleConsumer.scala:109)
>>>>      at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>      at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
>>>>      at kafka.server.AbstractFetcherThread.processFetchRequest(
>>>> AbstractFetcherThread.scala:94)
>>>>      at kafka.server.AbstractFetcherThread.doWork(
>>>> AbstractFetcherThread.scala:86)
>>>>      at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>>>>
>>>>
>>>> Does it effect anything. I havent looked at it as it was just a warning.
>>>> Should i be worried about this?
>>>>
>>>>
>>>> Thanks
>>>> Arjun Narasimha kota
>>>>
>>>> On Tuesday 25 February 2014 03:45 PM, Arjun wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am using kafka 0.8. I have 3 brokers on three systems and 3
>>>>> zookeepers running.
>>>>> I am using the high level consumer which is in examples folder of
>>>>> kafka. I am able to push the messages into the queue, but retriving the
>>>>> messages is taking some time. Is there any way i can tune this.
>>>>>
>>>>> I get this info in the kafka console, does the consumer slowness
>>>>> because of this??
>>>>> Reconnect due to socket error: null
>>>>>
>>>>> The producer is pushing the messages as i can see that using Consumer
>>>>> offset checker tool.I can also see there is a lag in the consumer messages
>>>>> in this.
>>>>>
>>>>>
>>>>> Thanks
>>>>> Arjun Narasimha Kota
>>>>>
>>>>


Re: Delay in fetching messages - High level consumer -Kafka 0.8

Posted by Neha Narkhede <ne...@gmail.com>.
Arjun,

Have you looked at
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped,why
?

Thanks,
Neha


On Tue, Feb 25, 2014 at 5:04 AM, Arjun <ar...@socialtwist.com> wrote:

> The thing i found is my ConsumerFetcherThreads are not going beyond
> BoundedByteBufferReceive.readFrom. When i added a few more traces in that
> function i found that the call is stalling after exceptIncomplete function.
> I guess Utils.read is stalling for more than 30 sec, which is the socket
> time out time.
> Is there anyway i can increase the socket timeout over there. I am not
> getting why the consumers are getting stuck over there.
>
> There are no error on the brokers.
>
> Thanks
> Arjun Narasimha Kota
>
>
>
> On Tuesday 25 February 2014 05:45 PM, Arjun wrote:
>
>> Adding to this, i have started my logs in trace mode. I fount that the
>> Consumer fetcher threads are sending the meta data but are not receiving
>> any.
>> I see all the
>> "TRACE [ConsumerFetcherThread-group1_www.taf-dev.com-1393329622308-6e15dd12-0-1]
>> [kafka.network.BoundedByteBufferSend] 205 bytes written"
>> but no reading is taking place. I may be looking at some thing wrong.
>>
>> Can some one please help me out in this.
>>
>> Thanks
>> Arjun Narasimha kota
>>
>>
>> On Tuesday 25 February 2014 03:46 PM, Arjun wrote:
>>
>>> Apart from that i get this stack trace
>>>
>>> 25 Feb 2014 15:45:22,636 WARN [ConsumerFetcherThread-group1_
>>> www.taf-dev.com-1393322165136-8318b07d-0-0] [kafka.consumer.ConsumerFetcherThread]
>>> [ConsumerFetcherThread-group1_www.taf-dev.com-1393322165136-8318b07d-0-0],
>>> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 32; ClientId:
>>> group1-ConsumerFetcherThread-group1_www.taf-dev.com-1393322165136-8318b07d-0-0;
>>> ReplicaId: -1; MaxWait: 10000000 ms; MinBytes: 1 bytes; RequestInfo:
>>> [taf.referral.emails.service,8] -> PartitionFetchInfo(1702,1048576)
>>> java.net.SocketTimeoutException
>>>     at sun.nio.ch.SocketAdaptor$SocketInputStream.read(
>>> SocketAdaptor.java:201)
>>>     at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
>>>     at java.nio.channels.Channels$ReadableByteChannelImpl.read(
>>> Channels.java:221)
>>>     at kafka.utils.Utils$.read(Utils.scala:395)
>>>     at kafka.network.BoundedByteBufferReceive.readFrom(
>>> BoundedByteBufferReceive.scala:54)
>>>     at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>     at kafka.network.BoundedByteBufferReceive.readCompletely(
>>> BoundedByteBufferReceive.scala:29)
>>>     at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>>>     at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.
>>> scala:81)
>>>     at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
>>> $sendRequest(SimpleConsumer.scala:71)
>>>     at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>>> apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
>>>     at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>>> apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>>>     at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>>> apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>>>     at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>     at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
>>> SimpleConsumer.scala:109)
>>>     at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>>> SimpleConsumer.scala:109)
>>>     at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>>> SimpleConsumer.scala:109)
>>>     at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>     at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
>>>     at kafka.server.AbstractFetcherThread.processFetchRequest(
>>> AbstractFetcherThread.scala:94)
>>>     at kafka.server.AbstractFetcherThread.doWork(
>>> AbstractFetcherThread.scala:86)
>>>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>>>
>>>
>>> Does it effect anything. I havent looked at it as it was just a warning.
>>> Should i be worried about this?
>>>
>>>
>>> Thanks
>>> Arjun Narasimha kota
>>>
>>> On Tuesday 25 February 2014 03:45 PM, Arjun wrote:
>>>
>>>> Hi,
>>>>
>>>> I am using kafka 0.8. I have 3 brokers on three systems and 3
>>>> zookeepers running.
>>>> I am using the high level consumer which is in examples folder of
>>>> kafka. I am able to push the messages into the queue, but retriving the
>>>> messages is taking some time. Is there any way i can tune this.
>>>>
>>>> I get this info in the kafka console, does the consumer slowness
>>>> because of this??
>>>> Reconnect due to socket error: null
>>>>
>>>> The producer is pushing the messages as i can see that using Consumer
>>>> offset checker tool.I can also see there is a lag in the consumer messages
>>>> in this.
>>>>
>>>>
>>>> Thanks
>>>> Arjun Narasimha Kota
>>>>
>>>
>>>
>>
>

Re: Delay in fetching messages - High level consumer -Kafka 0.8

Posted by Arjun <ar...@socialtwist.com>.
The thing i found is my ConsumerFetcherThreads are not going beyond 
BoundedByteBufferReceive.readFrom. When i added a few more traces in 
that function i found that the call is stalling after exceptIncomplete 
function. I guess Utils.read is stalling for more than 30 sec, which is 
the socket time out time.
Is there anyway i can increase the socket timeout over there. I am not 
getting why the consumers are getting stuck over there.

There are no error on the brokers.

Thanks
Arjun Narasimha Kota


On Tuesday 25 February 2014 05:45 PM, Arjun wrote:
> Adding to this, i have started my logs in trace mode. I fount that the 
> Consumer fetcher threads are sending the meta data but are not 
> receiving any.
> I see all the
> "TRACE 
> [ConsumerFetcherThread-group1_www.taf-dev.com-1393329622308-6e15dd12-0-1] 
> [kafka.network.BoundedByteBufferSend] 205 bytes written"
> but no reading is taking place. I may be looking at some thing wrong.
>
> Can some one please help me out in this.
>
> Thanks
> Arjun Narasimha kota
>
>
> On Tuesday 25 February 2014 03:46 PM, Arjun wrote:
>> Apart from that i get this stack trace
>>
>> 25 Feb 2014 15:45:22,636 WARN 
>> [ConsumerFetcherThread-group1_www.taf-dev.com-1393322165136-8318b07d-0-0] 
>> [kafka.consumer.ConsumerFetcherThread] 
>> [ConsumerFetcherThread-group1_www.taf-dev.com-1393322165136-8318b07d-0-0], 
>> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 32; 
>> ClientId: 
>> group1-ConsumerFetcherThread-group1_www.taf-dev.com-1393322165136-8318b07d-0-0; 
>> ReplicaId: -1; MaxWait: 10000000 ms; MinBytes: 1 bytes; RequestInfo: 
>> [taf.referral.emails.service,8] -> PartitionFetchInfo(1702,1048576)
>> java.net.SocketTimeoutException
>>     at 
>> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
>>     at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
>>     at 
>> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
>>     at kafka.utils.Utils$.read(Utils.scala:395)
>>     at 
>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>>     at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>     at 
>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>     at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>>     at 
>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>>     at 
>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>>     at 
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
>>     at 
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>>     at 
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>>     at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>     at 
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
>>     at 
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
>>     at 
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
>>     at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>     at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
>>     at 
>> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
>>     at 
>> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
>>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>>
>>
>> Does it effect anything. I havent looked at it as it was just a 
>> warning. Should i be worried about this?
>>
>>
>> Thanks
>> Arjun Narasimha kota
>>
>> On Tuesday 25 February 2014 03:45 PM, Arjun wrote:
>>> Hi,
>>>
>>> I am using kafka 0.8. I have 3 brokers on three systems and 3 
>>> zookeepers running.
>>> I am using the high level consumer which is in examples folder of 
>>> kafka. I am able to push the messages into the queue, but retriving 
>>> the messages is taking some time. Is there any way i can tune this.
>>>
>>> I get this info in the kafka console, does the consumer slowness 
>>> because of this??
>>> Reconnect due to socket error: null
>>>
>>> The producer is pushing the messages as i can see that using 
>>> Consumer offset checker tool.I can also see there is a lag in the 
>>> consumer messages in this.
>>>
>>>
>>> Thanks
>>> Arjun Narasimha Kota
>>
>


Re: Delay in fetching messages - High level consumer -Kafka 0.8

Posted by Arjun <ar...@socialtwist.com>.
Adding to this, i have started my logs in trace mode. I fount that the 
Consumer fetcher threads are sending the meta data but are not receiving 
any.
I see all the
"TRACE 
[ConsumerFetcherThread-group1_www.taf-dev.com-1393329622308-6e15dd12-0-1] [kafka.network.BoundedByteBufferSend] 
205 bytes written"
but no reading is taking place. I may be looking at some thing wrong.

Can some one please help me out in this.

Thanks
Arjun Narasimha kota


On Tuesday 25 February 2014 03:46 PM, Arjun wrote:
> Apart from that i get this stack trace
>
> 25 Feb 2014 15:45:22,636 WARN 
> [ConsumerFetcherThread-group1_www.taf-dev.com-1393322165136-8318b07d-0-0] 
> [kafka.consumer.ConsumerFetcherThread] 
> [ConsumerFetcherThread-group1_www.taf-dev.com-1393322165136-8318b07d-0-0], 
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 32; 
> ClientId: 
> group1-ConsumerFetcherThread-group1_www.taf-dev.com-1393322165136-8318b07d-0-0; 
> ReplicaId: -1; MaxWait: 10000000 ms; MinBytes: 1 bytes; RequestInfo: 
> [taf.referral.emails.service,8] -> PartitionFetchInfo(1702,1048576)
> java.net.SocketTimeoutException
>     at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
>     at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
>     at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
>     at kafka.utils.Utils$.read(Utils.scala:395)
>     at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>     at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>     at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>     at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>     at 
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>     at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>     at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
>     at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>     at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>     at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>     at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
>     at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
>     at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
>     at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>     at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
>     at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
>     at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
>
> Does it effect anything. I havent looked at it as it was just a 
> warning. Should i be worried about this?
>
>
> Thanks
> Arjun Narasimha kota
>
> On Tuesday 25 February 2014 03:45 PM, Arjun wrote:
>> Hi,
>>
>> I am using kafka 0.8. I have 3 brokers on three systems and 3 
>> zookeepers running.
>> I am using the high level consumer which is in examples folder of 
>> kafka. I am able to push the messages into the queue, but retriving 
>> the messages is taking some time. Is there any way i can tune this.
>>
>> I get this info in the kafka console, does the consumer slowness 
>> because of this??
>> Reconnect due to socket error: null
>>
>> The producer is pushing the messages as i can see that using Consumer 
>> offset checker tool.I can also see there is a lag in the consumer 
>> messages in this.
>>
>>
>> Thanks
>> Arjun Narasimha Kota
>


Re: Delay in fetching messages - High level consumer -Kafka 0.8

Posted by Arjun <ar...@socialtwist.com>.
Apart from that i get this stack trace

25 Feb 2014 15:45:22,636 WARN 
[ConsumerFetcherThread-group1_www.taf-dev.com-1393322165136-8318b07d-0-0] [kafka.consumer.ConsumerFetcherThread] 
[ConsumerFetcherThread-group1_www.taf-dev.com-1393322165136-8318b07d-0-0], 
Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 32; 
ClientId: 
group1-ConsumerFetcherThread-group1_www.taf-dev.com-1393322165136-8318b07d-0-0; 
ReplicaId: -1; MaxWait: 10000000 ms; MinBytes: 1 bytes; RequestInfo: 
[taf.referral.emails.service,8] -> PartitionFetchInfo(1702,1048576)
java.net.SocketTimeoutException
     at 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
     at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
     at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
     at kafka.utils.Utils$.read(Utils.scala:395)
     at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
     at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
     at 
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
     at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
     at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
     at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
     at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
     at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
     at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
     at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
     at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
     at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
     at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
     at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
     at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
     at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
     at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)


Does it effect anything. I havent looked at it as it was just a warning. 
Should i be worried about this?


Thanks
Arjun Narasimha kota

On Tuesday 25 February 2014 03:45 PM, Arjun wrote:
> Hi,
>
> I am using kafka 0.8. I have 3 brokers on three systems and 3 
> zookeepers running.
> I am using the high level consumer which is in examples folder of 
> kafka. I am able to push the messages into the queue, but retriving 
> the messages is taking some time. Is there any way i can tune this.
>
> I get this info in the kafka console, does the consumer slowness 
> because of this??
> Reconnect due to socket error: null
>
> The producer is pushing the messages as i can see that using Consumer 
> offset checker tool.I can also see there is a lag in the consumer 
> messages in this.
>
>
> Thanks
> Arjun Narasimha Kota