You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by navneet sharma <na...@gmail.com> on 2012/05/11 09:09:57 UTC

Exception from consumer

Hi,

I tried following scenario:
1) Created producer for sending messages to 3 topics.
2) Created 3 consumers in same group for 1 topic, so 2 topics should be
unread.
3) After successful execution of consumer-producer for multiple times, i
thought to delete the log file because it grew very large.
4) So in effect, the messages for 2 topics which were unread got deleted.
5) I ran the above experiment again.
6) Now, changed the consumer code - and created 3 consumers for each of 3
topics in 3 different groups. So, now i wanted to read messages for all 3
topics.

But, after that i am seeing following exception in broker log:::
20736 [kafka-processor-0] ERROR kafka.server.KafkaRequestHandlers  - error
when processing request FetchRequest(topic:orderTopic, part:0
offset:298534904 maxSize:307200)
kafka.common.OffsetOutOfRangeException: offset 298534904 is out of range
    at kafka.log.Log$.findRange(Log.scala:48)
    at kafka.log.Log.read(Log.scala:224)
    at
kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$readMessageSet(KafkaRequestHandlers.scala:116)
    at
kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:106)
    at
kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:105)
    at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
    at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
    at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
    at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
    at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
    at
kafka.server.KafkaRequestHandlers.handleMultiFetchRequest(KafkaRequestHandlers.scala:105)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45)
    at kafka.network.Processor.handle(SocketServer.scala:289)
    at kafka.network.Processor.read(SocketServer.scala:312)
    at kafka.network.Processor.run(SocketServer.scala:207)
    at java.lang.Thread.run(Thread.java:662)

and this exception at consumer side:::
12:27:36,259 [FetchRunnable-0] ERROR kafka.consumer.FetcherRunnable  -
error in FetcherRunnable for orderTopic:1-1: fetched offset = 254633932:
consumed offset = 254633932
kafka.common.InvalidMessageSizeException: invalid message size: 1681733685
only received bytes: 307196 at 254633932( possible causes (1) a single
message larger than the fetch size; (2) log corruption )
        at
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:103)
        at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138)
        at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82)
        at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
        at
kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:65)
        at
kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:60)
        at
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:62)
        at
kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:82)
        at
kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:68)
        at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:68)


in fact consumer is getting killed after throwing this exception.

i feel i got into trouble because i deleted the logs in between and
consumer for some reason is still trying to retrieve messages from an older
offset. Is that the case?

How to get over this problem?

Thanks,
Navneet Sharma

Re: Exception from consumer

Posted by Jun Rao <ju...@gmail.com>.
You can stop the iterating of messages by calling connector.shutdown. When
a consumer is restarted, it resumes from the last checkpointed offset in
ZK, assuming the consuming group id is the same.

Jun

On Tue, May 15, 2012 at 6:04 AM, navneet sharma <navneetsharma0505@gmail.com
> wrote:

> Jun,
>
> Deleting zookeeper directory or broker logs is problematic and should not
> be done.
>
> How about killing a consumer process? Since, its in a infinite loop, i
> can't see any other clean option to stop it.
>
> Also, if the same consumer is restarted, is it treated as the same previous
> consumer in zookeeper or a different one? I checked the docs but its not
> very clear to me.
>
> Thanks,
> Navneet Sharma
>
> On Fri, May 11, 2012 at 8:07 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > Navneet,
> >
> > Normally, you shouldn't delete the broker log yourself (it's GC-ed based
> on
> > the retention time configured at the broker). If this is for testing,
> then
> > you need to clean up the ZK data too.
> >
> > Jun
> >
> > On Fri, May 11, 2012 at 1:05 AM, navneet sharma <
> > navneetsharma0505@gmail.com
> > > wrote:
> >
> > > i deleted the broker log file and that fixed the problem. But is there
> > any
> > > better way to fix?
> > >
> > > On Fri, May 11, 2012 at 12:39 PM, navneet sharma <
> > > navneetsharma0505@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > I tried following scenario:
> > > > 1) Created producer for sending messages to 3 topics.
> > > > 2) Created 3 consumers in same group for 1 topic, so 2 topics should
> be
> > > > unread.
> > > > 3) After successful execution of consumer-producer for multiple
> times,
> > i
> > > > thought to delete the log file because it grew very large.
> > > > 4) So in effect, the messages for 2 topics which were unread got
> > deleted.
> > > > 5) I ran the above experiment again.
> > > > 6) Now, changed the consumer code - and created 3 consumers for each
> > of 3
> > > > topics in 3 different groups. So, now i wanted to read messages for
> > all 3
> > > > topics.
> > > >
> > > > But, after that i am seeing following exception in broker log:::
> > > > 20736 [kafka-processor-0] ERROR kafka.server.KafkaRequestHandlers  -
> > > error
> > > > when processing request FetchRequest(topic:orderTopic, part:0
> > > > offset:298534904 maxSize:307200)
> > > > kafka.common.OffsetOutOfRangeException: offset 298534904 is out of
> > range
> > > >     at kafka.log.Log$.findRange(Log.scala:48)
> > > >     at kafka.log.Log.read(Log.scala:224)
> > > >     at
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$readMessageSet(KafkaRequestHandlers.scala:116)
> > > >     at
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:106)
> > > >     at
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:105)
> > > >     at
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > >     at
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > >     at
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > >     at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > >     at
> > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > >     at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > >     at
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiFetchRequest(KafkaRequestHandlers.scala:105)
> > > >     at
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45)
> > > >     at
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45)
> > > >     at kafka.network.Processor.handle(SocketServer.scala:289)
> > > >     at kafka.network.Processor.read(SocketServer.scala:312)
> > > >     at kafka.network.Processor.run(SocketServer.scala:207)
> > > >     at java.lang.Thread.run(Thread.java:662)
> > > >
> > > > and this exception at consumer side:::
> > > > 12:27:36,259 [FetchRunnable-0] ERROR kafka.consumer.FetcherRunnable
>  -
> > > > error in FetcherRunnable for orderTopic:1-1: fetched offset =
> > 254633932:
> > > > consumed offset = 254633932
> > > > kafka.common.InvalidMessageSizeException: invalid message size:
> > > 1681733685
> > > > only received bytes: 307196 at 254633932( possible causes (1) a
> single
> > > > message larger than the fetch size; (2) log corruption )
> > > >         at
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:103)
> > > >         at
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138)
> > > >         at
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82)
> > > >         at
> > > >
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > >         at
> > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > >         at
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:65)
> > > >         at
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:60)
> > > >         at
> > > >
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:62)
> > > >         at
> > > >
> > >
> >
> kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:82)
> > > >         at
> > > >
> > >
> >
> kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:68)
> > > >         at
> > > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> > > >         at scala.collection.immutable.List.foreach(List.scala:45)
> > > >         at
> kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:68)
> > > >
> > > >
> > > > in fact consumer is getting killed after throwing this exception.
> > > >
> > > > i feel i got into trouble because i deleted the logs in between and
> > > > consumer for some reason is still trying to retrieve messages from an
> > > older
> > > > offset. Is that the case?
> > > >
> > > > How to get over this problem?
> > > >
> > > > Thanks,
> > > > Navneet Sharma
> > > >
> > >
> >
>

Re: Exception from consumer

Posted by navneet sharma <na...@gmail.com>.
Jun,

Deleting zookeeper directory or broker logs is problematic and should not
be done.

How about killing a consumer process? Since, its in a infinite loop, i
can't see any other clean option to stop it.

Also, if the same consumer is restarted, is it treated as the same previous
consumer in zookeeper or a different one? I checked the docs but its not
very clear to me.

Thanks,
Navneet Sharma

On Fri, May 11, 2012 at 8:07 PM, Jun Rao <ju...@gmail.com> wrote:

> Navneet,
>
> Normally, you shouldn't delete the broker log yourself (it's GC-ed based on
> the retention time configured at the broker). If this is for testing, then
> you need to clean up the ZK data too.
>
> Jun
>
> On Fri, May 11, 2012 at 1:05 AM, navneet sharma <
> navneetsharma0505@gmail.com
> > wrote:
>
> > i deleted the broker log file and that fixed the problem. But is there
> any
> > better way to fix?
> >
> > On Fri, May 11, 2012 at 12:39 PM, navneet sharma <
> > navneetsharma0505@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > I tried following scenario:
> > > 1) Created producer for sending messages to 3 topics.
> > > 2) Created 3 consumers in same group for 1 topic, so 2 topics should be
> > > unread.
> > > 3) After successful execution of consumer-producer for multiple times,
> i
> > > thought to delete the log file because it grew very large.
> > > 4) So in effect, the messages for 2 topics which were unread got
> deleted.
> > > 5) I ran the above experiment again.
> > > 6) Now, changed the consumer code - and created 3 consumers for each
> of 3
> > > topics in 3 different groups. So, now i wanted to read messages for
> all 3
> > > topics.
> > >
> > > But, after that i am seeing following exception in broker log:::
> > > 20736 [kafka-processor-0] ERROR kafka.server.KafkaRequestHandlers  -
> > error
> > > when processing request FetchRequest(topic:orderTopic, part:0
> > > offset:298534904 maxSize:307200)
> > > kafka.common.OffsetOutOfRangeException: offset 298534904 is out of
> range
> > >     at kafka.log.Log$.findRange(Log.scala:48)
> > >     at kafka.log.Log.read(Log.scala:224)
> > >     at
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$readMessageSet(KafkaRequestHandlers.scala:116)
> > >     at
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:106)
> > >     at
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:105)
> > >     at
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > >     at
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > >     at
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > >     at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > >     at
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > >     at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > >     at
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiFetchRequest(KafkaRequestHandlers.scala:105)
> > >     at
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45)
> > >     at
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45)
> > >     at kafka.network.Processor.handle(SocketServer.scala:289)
> > >     at kafka.network.Processor.read(SocketServer.scala:312)
> > >     at kafka.network.Processor.run(SocketServer.scala:207)
> > >     at java.lang.Thread.run(Thread.java:662)
> > >
> > > and this exception at consumer side:::
> > > 12:27:36,259 [FetchRunnable-0] ERROR kafka.consumer.FetcherRunnable  -
> > > error in FetcherRunnable for orderTopic:1-1: fetched offset =
> 254633932:
> > > consumed offset = 254633932
> > > kafka.common.InvalidMessageSizeException: invalid message size:
> > 1681733685
> > > only received bytes: 307196 at 254633932( possible causes (1) a single
> > > message larger than the fetch size; (2) log corruption )
> > >         at
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:103)
> > >         at
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138)
> > >         at
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82)
> > >         at
> > >
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > >         at
> > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > >         at
> > >
> >
> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:65)
> > >         at
> > >
> >
> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:60)
> > >         at
> > > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:62)
> > >         at
> > >
> >
> kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:82)
> > >         at
> > >
> >
> kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:68)
> > >         at
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> > >         at scala.collection.immutable.List.foreach(List.scala:45)
> > >         at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:68)
> > >
> > >
> > > in fact consumer is getting killed after throwing this exception.
> > >
> > > i feel i got into trouble because i deleted the logs in between and
> > > consumer for some reason is still trying to retrieve messages from an
> > older
> > > offset. Is that the case?
> > >
> > > How to get over this problem?
> > >
> > > Thanks,
> > > Navneet Sharma
> > >
> >
>

Re: Exception from consumer

Posted by Jun Rao <ju...@gmail.com>.
Navneet,

Normally, you shouldn't delete the broker log yourself (it's GC-ed based on
the retention time configured at the broker). If this is for testing, then
you need to clean up the ZK data too.

Jun

On Fri, May 11, 2012 at 1:05 AM, navneet sharma <navneetsharma0505@gmail.com
> wrote:

> i deleted the broker log file and that fixed the problem. But is there any
> better way to fix?
>
> On Fri, May 11, 2012 at 12:39 PM, navneet sharma <
> navneetsharma0505@gmail.com> wrote:
>
> > Hi,
> >
> > I tried following scenario:
> > 1) Created producer for sending messages to 3 topics.
> > 2) Created 3 consumers in same group for 1 topic, so 2 topics should be
> > unread.
> > 3) After successful execution of consumer-producer for multiple times, i
> > thought to delete the log file because it grew very large.
> > 4) So in effect, the messages for 2 topics which were unread got deleted.
> > 5) I ran the above experiment again.
> > 6) Now, changed the consumer code - and created 3 consumers for each of 3
> > topics in 3 different groups. So, now i wanted to read messages for all 3
> > topics.
> >
> > But, after that i am seeing following exception in broker log:::
> > 20736 [kafka-processor-0] ERROR kafka.server.KafkaRequestHandlers  -
> error
> > when processing request FetchRequest(topic:orderTopic, part:0
> > offset:298534904 maxSize:307200)
> > kafka.common.OffsetOutOfRangeException: offset 298534904 is out of range
> >     at kafka.log.Log$.findRange(Log.scala:48)
> >     at kafka.log.Log.read(Log.scala:224)
> >     at
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$readMessageSet(KafkaRequestHandlers.scala:116)
> >     at
> >
> kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:106)
> >     at
> >
> kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:105)
> >     at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> >     at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> >     at
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> >     at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> >     at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> >     at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> >     at
> >
> kafka.server.KafkaRequestHandlers.handleMultiFetchRequest(KafkaRequestHandlers.scala:105)
> >     at
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45)
> >     at
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45)
> >     at kafka.network.Processor.handle(SocketServer.scala:289)
> >     at kafka.network.Processor.read(SocketServer.scala:312)
> >     at kafka.network.Processor.run(SocketServer.scala:207)
> >     at java.lang.Thread.run(Thread.java:662)
> >
> > and this exception at consumer side:::
> > 12:27:36,259 [FetchRunnable-0] ERROR kafka.consumer.FetcherRunnable  -
> > error in FetcherRunnable for orderTopic:1-1: fetched offset = 254633932:
> > consumed offset = 254633932
> > kafka.common.InvalidMessageSizeException: invalid message size:
> 1681733685
> > only received bytes: 307196 at 254633932( possible causes (1) a single
> > message larger than the fetch size; (2) log corruption )
> >         at
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:103)
> >         at
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138)
> >         at
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82)
> >         at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >         at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >         at
> >
> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:65)
> >         at
> >
> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:60)
> >         at
> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:62)
> >         at
> >
> kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:82)
> >         at
> >
> kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:68)
> >         at
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> >         at scala.collection.immutable.List.foreach(List.scala:45)
> >         at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:68)
> >
> >
> > in fact consumer is getting killed after throwing this exception.
> >
> > i feel i got into trouble because i deleted the logs in between and
> > consumer for some reason is still trying to retrieve messages from an
> older
> > offset. Is that the case?
> >
> > How to get over this problem?
> >
> > Thanks,
> > Navneet Sharma
> >
>

Re: Exception from consumer

Posted by navneet sharma <na...@gmail.com>.
i deleted the broker log file and that fixed the problem. But is there any
better way to fix?

On Fri, May 11, 2012 at 12:39 PM, navneet sharma <
navneetsharma0505@gmail.com> wrote:

> Hi,
>
> I tried following scenario:
> 1) Created producer for sending messages to 3 topics.
> 2) Created 3 consumers in same group for 1 topic, so 2 topics should be
> unread.
> 3) After successful execution of consumer-producer for multiple times, i
> thought to delete the log file because it grew very large.
> 4) So in effect, the messages for 2 topics which were unread got deleted.
> 5) I ran the above experiment again.
> 6) Now, changed the consumer code - and created 3 consumers for each of 3
> topics in 3 different groups. So, now i wanted to read messages for all 3
> topics.
>
> But, after that i am seeing following exception in broker log:::
> 20736 [kafka-processor-0] ERROR kafka.server.KafkaRequestHandlers  - error
> when processing request FetchRequest(topic:orderTopic, part:0
> offset:298534904 maxSize:307200)
> kafka.common.OffsetOutOfRangeException: offset 298534904 is out of range
>     at kafka.log.Log$.findRange(Log.scala:48)
>     at kafka.log.Log.read(Log.scala:224)
>     at
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$readMessageSet(KafkaRequestHandlers.scala:116)
>     at
> kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:106)
>     at
> kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:105)
>     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>     at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>     at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>     at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>     at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>     at
> kafka.server.KafkaRequestHandlers.handleMultiFetchRequest(KafkaRequestHandlers.scala:105)
>     at
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45)
>     at
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45)
>     at kafka.network.Processor.handle(SocketServer.scala:289)
>     at kafka.network.Processor.read(SocketServer.scala:312)
>     at kafka.network.Processor.run(SocketServer.scala:207)
>     at java.lang.Thread.run(Thread.java:662)
>
> and this exception at consumer side:::
> 12:27:36,259 [FetchRunnable-0] ERROR kafka.consumer.FetcherRunnable  -
> error in FetcherRunnable for orderTopic:1-1: fetched offset = 254633932:
> consumed offset = 254633932
> kafka.common.InvalidMessageSizeException: invalid message size: 1681733685
> only received bytes: 307196 at 254633932( possible causes (1) a single
> message larger than the fetch size; (2) log corruption )
>         at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:103)
>         at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138)
>         at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82)
>         at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>         at
> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:65)
>         at
> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:60)
>         at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:62)
>         at
> kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:82)
>         at
> kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:68)
>         at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>         at scala.collection.immutable.List.foreach(List.scala:45)
>         at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:68)
>
>
> in fact consumer is getting killed after throwing this exception.
>
> i feel i got into trouble because i deleted the logs in between and
> consumer for some reason is still trying to retrieve messages from an older
> offset. Is that the case?
>
> How to get over this problem?
>
> Thanks,
> Navneet Sharma
>