You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by pankaj ojha <pa...@gmail.com> on 2014/08/06 16:34:37 UTC

Issue in using kafka SimpleConsumer code

Hi Team,

I have a requirement of reading real time data using kafka and write to
cassandra.
For this I am using SimpleConsumer to read data from Kafka topics and
writing into Cassandra.
I am maintaining offsets of topics in my log files.
The issue is that after few days like 3-4 days my cosumer code does not
read data from kafka topics and produce below log output :

20:01:17,068  INFO NposKafkaConsumer:48 - Taking partition from application
properties
20:01:17,482 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT =
100000 (requested 100000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF =
64512 (requested -1).
20:01:17,545 DEBUG SimpleConsumer:52 - Disconnecting from <IP address>:9092
20:01:17,578 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run() method
Inside while loop :: Value of max_reads::1
20:01:17,662 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT =
100000 (requested 100000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF =
64512 (requested -1).
20:01:17,804 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping
20:01:18,804 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run() method
Inside while loop :: Value of max_reads::1
20:01:18,826 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping
20:01:19,827 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run() method
Inside while loop :: Value of max_reads::1
20:01:19,852 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping

NposKafkaConsumer is my main SimpleCosumer class.
But, when I restart the kafka process by incrementing the offset by one
then again my code starts running fine for next few days.

Can you please help me how I can solve this and where I am going wrong ?

Thanks & Regards,
Pankaj Ojha

Re: Issue in using kafka SimpleConsumer code

Posted by Harsha <ka...@harsha.io>.
I suspect kafka.common.OffsetOutOfRangeException happening for your
consumer if incrementing offset + 1 works. Try grep the server.log for
that exception. I am not sure of any known issues with simpleconsumer
that might causing this.  Also copy this thread to
users@kafka.apache.org . Other users might have experience this issue
before.

On Thu, Aug 7, 2014, at 06:54 AM, pankaj ojha wrote:
> Hi Harsha,
> 
> Thank you for replying.
> There is no error in the kafka logs. The output I had pasted in my
> previous
> mail is from kafka logs only.
> But when I restart the kafka process after incrementing the offset by 1,
> then it runs fine.
> Is there any known issue with the SimpleConsumer code?
> 
> Thanks,
> Pankaj
> 
> 
> On Thu, Aug 7, 2014 at 7:17 PM, Harsha <ka...@harsha.io> wrote:
> 
> > Hi Pankaj,
> >             Do you notice any errors in kafka logs when your consumer
> >             stops reading data. Attaching those logs would be helpful in
> >             finding the issue.
> > -Harsha
> >
> > On Thu, Aug 7, 2014, at 06:12 AM, pankaj ojha wrote:
> > > Hi Team,
> > >
> > > Can you please provide any information on my above problem?
> > >
> > > Thanks & Regards,
> > > Pankaj Ojha
> > >
> > >
> > > On Wed, Aug 6, 2014 at 8:04 PM, pankaj ojha <pa...@gmail.com>
> > > wrote:
> > >
> > > > Hi Team,
> > > >
> > > > I have a requirement of reading real time data using kafka and write to
> > > > cassandra.
> > > > For this I am using SimpleConsumer to read data from Kafka topics and
> > > > writing into Cassandra.
> > > > I am maintaining offsets of topics in my log files.
> > > > The issue is that after few days like 3-4 days my cosumer code does not
> > > > read data from kafka topics and produce below log output :
> > > >
> > > > 20:01:17,068  INFO NposKafkaConsumer:48 - Taking partition from
> > > > application properties
> > > > 20:01:17,482 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT
> > =
> > > > 100000 (requested 100000), SO_RCVBUF = 65536 (requested 65536),
> > SO_SNDBUF =
> > > > 64512 (requested -1).
> > > > 20:01:17,545 DEBUG SimpleConsumer:52 - Disconnecting from <IP
> > address>:9092
> > > > 20:01:17,578 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run()
> > method
> > > > Inside while loop :: Value of max_reads::1
> > > > 20:01:17,662 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT
> > =
> > > > 100000 (requested 100000), SO_RCVBUF = 65536 (requested 65536),
> > SO_SNDBUF =
> > > > 64512 (requested -1).
> > > > 20:01:17,804 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping
> > > > 20:01:18,804 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run()
> > method
> > > > Inside while loop :: Value of max_reads::1
> > > > 20:01:18,826 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping
> > > > 20:01:19,827 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run()
> > method
> > > > Inside while loop :: Value of max_reads::1
> > > > 20:01:19,852 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping
> > > >
> > > > NposKafkaConsumer is my main SimpleCosumer class.
> > > > But, when I restart the kafka process by incrementing the offset by one
> > > > then again my code starts running fine for next few days.
> > > >
> > > > Can you please help me how I can solve this and where I am going wrong
> > ?
> > > >
> > > > Thanks & Regards,
> > > > Pankaj Ojha
> > > >
> > > >
> >

Re: Issue in using kafka SimpleConsumer code

Posted by pankaj ojha <pa...@gmail.com>.
Hi Harsha,

Thank you for replying.
There is no error in the kafka logs. The output I had pasted in my previous
mail is from kafka logs only.
But when I restart the kafka process after incrementing the offset by 1,
then it runs fine.
Is there any known issue with the SimpleConsumer code?

Thanks,
Pankaj


On Thu, Aug 7, 2014 at 7:17 PM, Harsha <ka...@harsha.io> wrote:

> Hi Pankaj,
>             Do you notice any errors in kafka logs when your consumer
>             stops reading data. Attaching those logs would be helpful in
>             finding the issue.
> -Harsha
>
> On Thu, Aug 7, 2014, at 06:12 AM, pankaj ojha wrote:
> > Hi Team,
> >
> > Can you please provide any information on my above problem?
> >
> > Thanks & Regards,
> > Pankaj Ojha
> >
> >
> > On Wed, Aug 6, 2014 at 8:04 PM, pankaj ojha <pa...@gmail.com>
> > wrote:
> >
> > > Hi Team,
> > >
> > > I have a requirement of reading real time data using kafka and write to
> > > cassandra.
> > > For this I am using SimpleConsumer to read data from Kafka topics and
> > > writing into Cassandra.
> > > I am maintaining offsets of topics in my log files.
> > > The issue is that after few days like 3-4 days my cosumer code does not
> > > read data from kafka topics and produce below log output :
> > >
> > > 20:01:17,068  INFO NposKafkaConsumer:48 - Taking partition from
> > > application properties
> > > 20:01:17,482 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT
> =
> > > 100000 (requested 100000), SO_RCVBUF = 65536 (requested 65536),
> SO_SNDBUF =
> > > 64512 (requested -1).
> > > 20:01:17,545 DEBUG SimpleConsumer:52 - Disconnecting from <IP
> address>:9092
> > > 20:01:17,578 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run()
> method
> > > Inside while loop :: Value of max_reads::1
> > > 20:01:17,662 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT
> =
> > > 100000 (requested 100000), SO_RCVBUF = 65536 (requested 65536),
> SO_SNDBUF =
> > > 64512 (requested -1).
> > > 20:01:17,804 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping
> > > 20:01:18,804 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run()
> method
> > > Inside while loop :: Value of max_reads::1
> > > 20:01:18,826 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping
> > > 20:01:19,827 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run()
> method
> > > Inside while loop :: Value of max_reads::1
> > > 20:01:19,852 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping
> > >
> > > NposKafkaConsumer is my main SimpleCosumer class.
> > > But, when I restart the kafka process by incrementing the offset by one
> > > then again my code starts running fine for next few days.
> > >
> > > Can you please help me how I can solve this and where I am going wrong
> ?
> > >
> > > Thanks & Regards,
> > > Pankaj Ojha
> > >
> > >
>

Re: Issue in using kafka SimpleConsumer code

Posted by Harsha <ka...@harsha.io>.
Hi Pankaj,
            Do you notice any errors in kafka logs when your consumer
            stops reading data. Attaching those logs would be helpful in
            finding the issue.
-Harsha

On Thu, Aug 7, 2014, at 06:12 AM, pankaj ojha wrote:
> Hi Team,
> 
> Can you please provide any information on my above problem?
> 
> Thanks & Regards,
> Pankaj Ojha
> 
> 
> On Wed, Aug 6, 2014 at 8:04 PM, pankaj ojha <pa...@gmail.com>
> wrote:
> 
> > Hi Team,
> >
> > I have a requirement of reading real time data using kafka and write to
> > cassandra.
> > For this I am using SimpleConsumer to read data from Kafka topics and
> > writing into Cassandra.
> > I am maintaining offsets of topics in my log files.
> > The issue is that after few days like 3-4 days my cosumer code does not
> > read data from kafka topics and produce below log output :
> >
> > 20:01:17,068  INFO NposKafkaConsumer:48 - Taking partition from
> > application properties
> > 20:01:17,482 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT =
> > 100000 (requested 100000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF =
> > 64512 (requested -1).
> > 20:01:17,545 DEBUG SimpleConsumer:52 - Disconnecting from <IP address>:9092
> > 20:01:17,578 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run() method
> > Inside while loop :: Value of max_reads::1
> > 20:01:17,662 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT =
> > 100000 (requested 100000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF =
> > 64512 (requested -1).
> > 20:01:17,804 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping
> > 20:01:18,804 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run() method
> > Inside while loop :: Value of max_reads::1
> > 20:01:18,826 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping
> > 20:01:19,827 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run() method
> > Inside while loop :: Value of max_reads::1
> > 20:01:19,852 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping
> >
> > NposKafkaConsumer is my main SimpleCosumer class.
> > But, when I restart the kafka process by incrementing the offset by one
> > then again my code starts running fine for next few days.
> >
> > Can you please help me how I can solve this and where I am going wrong ?
> >
> > Thanks & Regards,
> > Pankaj Ojha
> >
> >

Re: Issue in using kafka SimpleConsumer code

Posted by pankaj ojha <pa...@gmail.com>.
Hi Team,

Can you please provide any information on my above problem?

Thanks & Regards,
Pankaj Ojha


On Wed, Aug 6, 2014 at 8:04 PM, pankaj ojha <pa...@gmail.com> wrote:

> Hi Team,
>
> I have a requirement of reading real time data using kafka and write to
> cassandra.
> For this I am using SimpleConsumer to read data from Kafka topics and
> writing into Cassandra.
> I am maintaining offsets of topics in my log files.
> The issue is that after few days like 3-4 days my cosumer code does not
> read data from kafka topics and produce below log output :
>
> 20:01:17,068  INFO NposKafkaConsumer:48 - Taking partition from
> application properties
> 20:01:17,482 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT =
> 100000 (requested 100000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF =
> 64512 (requested -1).
> 20:01:17,545 DEBUG SimpleConsumer:52 - Disconnecting from <IP address>:9092
> 20:01:17,578 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run() method
> Inside while loop :: Value of max_reads::1
> 20:01:17,662 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT =
> 100000 (requested 100000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF =
> 64512 (requested -1).
> 20:01:17,804 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping
> 20:01:18,804 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run() method
> Inside while loop :: Value of max_reads::1
> 20:01:18,826 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping
> 20:01:19,827 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run() method
> Inside while loop :: Value of max_reads::1
> 20:01:19,852 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping
>
> NposKafkaConsumer is my main SimpleCosumer class.
> But, when I restart the kafka process by incrementing the offset by one
> then again my code starts running fine for next few days.
>
> Can you please help me how I can solve this and where I am going wrong ?
>
> Thanks & Regards,
> Pankaj Ojha
>
>

Fwd: Issue in using kafka SimpleConsumer code

Posted by pankaj ojha <pa...@gmail.com>.
Hi Team,

I have a requirement of reading real time data using kafka and write to
cassandra.
For this I am using SimpleConsumer to read data from Kafka topics and
writing into Cassandra.
I am maintaining offsets of topics in my log files.
The issue is that after few days like 3-4 days my cosumer code does not
read data from kafka topics and produce below log output :

20:01:17,068  INFO NposKafkaConsumer:48 - Taking partition from application
properties
20:01:17,482 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT =
100000 (requested 100000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF =
64512 (requested -1).
20:01:17,545 DEBUG SimpleConsumer:52 - Disconnecting from <IP address>:9092
20:01:17,578 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run() method
Inside while loop :: Value of max_reads::1
20:01:17,662 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT =
100000 (requested 100000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF =
64512 (requested -1).
20:01:17,804 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping
20:01:18,804 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run() method
Inside while loop :: Value of max_reads::1
20:01:18,826 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping
20:01:19,827 DEBUG NposKafkaConsumer:113 - NposKafkaConsumer.run() method
Inside while loop :: Value of max_reads::1
20:01:19,852 DEBUG NposKafkaConsumer:193 - numRead::0 Sleeping

NposKafkaConsumer is my main SimpleCosumer class.
But, when I restart the kafka process by incrementing the offset by one
then again my code starts running fine for next few days.

Can you please help me how I can solve this and where I am going wrong ?

Thanks & Regards,
Pankaj Ojha