You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Stuart Reynolds <st...@stureynolds.com> on 2015/02/23 03:45:38 UTC
Latest offset is frozen
I'm finding that if I continuously produce values to a topic (say,
once every 2 seconds), and in another thread, query the head and tail
offsets of a topic, then sometimes I see the head offset increasing,
sometimes its frozen. What's up with that?
I'm using scala client: 0.8.2 and server: 2.9.2-0.8.1.1
I querying the head and tail offsets like this:
private def getOffset(consumer: SimpleConsumer, topic: String,
partition: Int, whichTime: Long): Long = {
val topicAndPartition = new TopicAndPartition(topic, partition);
val response = consumer.earliestOrLatestOffset(
topicAndPartition,
earliestOrLatest = whichTime,
consumerId= 0);
return response;
}
case class HeadAndTailOffsets(head: Long, tail: Long)
def getHeadAndTailOffsets(consumer: SimpleConsumer, topic: String,
partition: Int = 0): HeadAndTailOffsets =
HeadAndTailOffsets(
head = getOffset(consumer, topic, partition,
kafka.api.OffsetRequest.EarliestTime),
tail = getOffset(consumer, topic, partition,
kafka.api.OffsetRequest.LatestTime))
-----
If I run a producer, consumer, and offset reporter threads. On the
first run I might get something like this:
-----
producer consumer offsets
offset,message head,tail
"MSG-0" 0, "MSG-0" 0,1
"MSG-1" 1, "MSG-1" 0,2
"MSG-2" 2, "MSG-2" 0,3
... ... ...
-----
On subsequent runs, I might see something like this:
-----
producer consumer offsets
offset,message head,tail
"MSG-0" 10, "MSG-0” 0,21 ** tail is frozen
"MSG-1" 11, "MSG-1" 0,21
"MSG-2" 12, "MSG-2” 0,21 ** lies, damn lies
..
"MSG-31" 31,"MSG-21" 0,21
"MSG-32" 31,"MSG-22" 0,21
... ... ...
-----
i.e. the consumer sees increasing offsets with the received messages,
but the thread reporting the topic's head and tail offsets is frozen.
Is this a client bug or an issue with my usage?
I have a fuller code sample here:
http://stackoverflow.com/questions/28663714/why-is-kafkas-latest-offset-report-sometimes-frozen
Thanks
- Stuart
Re: Latest offset is frozen
Posted by Stuart Reynolds <st...@stureynolds.com>.
Doh! Was assuming there was only 1 partition... Need to read all the partitions.
On Mon, Feb 23, 2015 at 3:21 PM, Jun Rao <ju...@confluent.io> wrote:
> Hmm, when the tail offset is frozen, does it freeze forever? Also, do you
> get the same frozen offset if you run the GetOffsetShell command?
>
> Thanks,
>
> Jun
>
> On Sun, Feb 22, 2015 at 6:45 PM, Stuart Reynolds <st...@stureynolds.com>
> wrote:
>
>> I'm finding that if I continuously produce values to a topic (say,
>> once every 2 seconds), and in another thread, query the head and tail
>> offsets of a topic, then sometimes I see the head offset increasing,
>> sometimes its frozen. What's up with that?
>>
>> I'm using scala client: 0.8.2 and server: 2.9.2-0.8.1.1
>>
>> I querying the head and tail offsets like this:
>>
>> private def getOffset(consumer: SimpleConsumer, topic: String,
>> partition: Int, whichTime: Long): Long = {
>> val topicAndPartition = new TopicAndPartition(topic, partition);
>> val response = consumer.earliestOrLatestOffset(
>> topicAndPartition,
>> earliestOrLatest = whichTime,
>> consumerId= 0);
>> return response;
>> }
>>
>> case class HeadAndTailOffsets(head: Long, tail: Long)
>>
>> def getHeadAndTailOffsets(consumer: SimpleConsumer, topic: String,
>> partition: Int = 0): HeadAndTailOffsets =
>> HeadAndTailOffsets(
>> head = getOffset(consumer, topic, partition,
>> kafka.api.OffsetRequest.EarliestTime),
>> tail = getOffset(consumer, topic, partition,
>> kafka.api.OffsetRequest.LatestTime))
>>
>> -----
>> If I run a producer, consumer, and offset reporter threads. On the
>> first run I might get something like this:
>>
>>
>> -----
>> producer consumer offsets
>> offset,message head,tail
>> "MSG-0" 0, "MSG-0" 0,1
>> "MSG-1" 1, "MSG-1" 0,2
>> "MSG-2" 2, "MSG-2" 0,3
>> ... ... ...
>> -----
>> On subsequent runs, I might see something like this:
>> -----
>> producer consumer offsets
>> offset,message head,tail
>> "MSG-0" 10, "MSG-0” 0,21 ** tail is frozen
>> "MSG-1" 11, "MSG-1" 0,21
>> "MSG-2" 12, "MSG-2” 0,21 ** lies, damn lies
>> ..
>> "MSG-31" 31,"MSG-21" 0,21
>> "MSG-32" 31,"MSG-22" 0,21
>> ... ... ...
>> -----
>> i.e. the consumer sees increasing offsets with the received messages,
>> but the thread reporting the topic's head and tail offsets is frozen.
>>
>> Is this a client bug or an issue with my usage?
>>
>> I have a fuller code sample here:
>>
>> http://stackoverflow.com/questions/28663714/why-is-kafkas-latest-offset-report-sometimes-frozen
>>
>> Thanks
>> - Stuart
>>
Re: Latest offset is frozen
Posted by Jun Rao <ju...@confluent.io>.
Hmm, when the tail offset is frozen, does it freeze forever? Also, do you
get the same frozen offset if you run the GetOffsetShell command?
Thanks,
Jun
On Sun, Feb 22, 2015 at 6:45 PM, Stuart Reynolds <st...@stureynolds.com>
wrote:
> I'm finding that if I continuously produce values to a topic (say,
> once every 2 seconds), and in another thread, query the head and tail
> offsets of a topic, then sometimes I see the head offset increasing,
> sometimes its frozen. What's up with that?
>
> I'm using scala client: 0.8.2 and server: 2.9.2-0.8.1.1
>
> I querying the head and tail offsets like this:
>
> private def getOffset(consumer: SimpleConsumer, topic: String,
> partition: Int, whichTime: Long): Long = {
> val topicAndPartition = new TopicAndPartition(topic, partition);
> val response = consumer.earliestOrLatestOffset(
> topicAndPartition,
> earliestOrLatest = whichTime,
> consumerId= 0);
> return response;
> }
>
> case class HeadAndTailOffsets(head: Long, tail: Long)
>
> def getHeadAndTailOffsets(consumer: SimpleConsumer, topic: String,
> partition: Int = 0): HeadAndTailOffsets =
> HeadAndTailOffsets(
> head = getOffset(consumer, topic, partition,
> kafka.api.OffsetRequest.EarliestTime),
> tail = getOffset(consumer, topic, partition,
> kafka.api.OffsetRequest.LatestTime))
>
> -----
> If I run a producer, consumer, and offset reporter threads. On the
> first run I might get something like this:
>
>
> -----
> producer consumer offsets
> offset,message head,tail
> "MSG-0" 0, "MSG-0" 0,1
> "MSG-1" 1, "MSG-1" 0,2
> "MSG-2" 2, "MSG-2" 0,3
> ... ... ...
> -----
> On subsequent runs, I might see something like this:
> -----
> producer consumer offsets
> offset,message head,tail
> "MSG-0" 10, "MSG-0” 0,21 ** tail is frozen
> "MSG-1" 11, "MSG-1" 0,21
> "MSG-2" 12, "MSG-2” 0,21 ** lies, damn lies
> ..
> "MSG-31" 31,"MSG-21" 0,21
> "MSG-32" 31,"MSG-22" 0,21
> ... ... ...
> -----
> i.e. the consumer sees increasing offsets with the received messages,
> but the thread reporting the topic's head and tail offsets is frozen.
>
> Is this a client bug or an issue with my usage?
>
> I have a fuller code sample here:
>
> http://stackoverflow.com/questions/28663714/why-is-kafkas-latest-offset-report-sometimes-frozen
>
> Thanks
> - Stuart
>