You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Stuart Reynolds <st...@stureynolds.com> on 2015/02/23 03:45:38 UTC

Latest offset is frozen

I'm finding that if I continuously produce values to a topic (say,
once every 2 seconds), and in another thread, query the head and tail
offsets of a topic, then sometimes I see the head offset increasing,
sometimes its frozen. What's up with that?

I'm using scala client: 0.8.2 and server: 2.9.2-0.8.1.1

I querying the head and tail offsets like this:

  private def getOffset(consumer: SimpleConsumer, topic: String,
partition: Int, whichTime: Long): Long = {
    val topicAndPartition = new TopicAndPartition(topic, partition);
    val response = consumer.earliestOrLatestOffset(
       topicAndPartition,
       earliestOrLatest = whichTime,
       consumerId= 0);
    return response;
  }

  case class HeadAndTailOffsets(head: Long, tail: Long)

  def getHeadAndTailOffsets(consumer: SimpleConsumer, topic: String,
partition: Int = 0): HeadAndTailOffsets =
    HeadAndTailOffsets(
      head = getOffset(consumer, topic, partition,
kafka.api.OffsetRequest.EarliestTime),
      tail = getOffset(consumer, topic, partition,
kafka.api.OffsetRequest.LatestTime))

-----
If I run a producer, consumer, and offset reporter threads. On the
first run I might get something like this:


-----
producer           consumer             offsets
                   offset,message           head,tail
  "MSG-0"           0, "MSG-0"              0,1
  "MSG-1"           1, "MSG-1"              0,2
  "MSG-2"           2, "MSG-2"              0,3
    ...              ...                    ...
-----
On subsequent runs, I might see something like this:
-----
     producer          consumer               offsets
                    offset,message          head,tail
      "MSG-0"          10, "MSG-0”             0,21 ** tail is frozen
      "MSG-1"          11, "MSG-1"             0,21
      "MSG-2"          12, "MSG-2”             0,21 ** lies, damn lies
       ..
      "MSG-31"         31,"MSG-21"             0,21
      "MSG-32"         31,"MSG-22"             0,21
        ...              ...                    ...
-----
i.e. the consumer sees increasing offsets with the received messages,
but the thread reporting the topic's head and tail offsets is frozen.

Is this a client bug or an issue with my usage?

I have a fuller code sample here:
  http://stackoverflow.com/questions/28663714/why-is-kafkas-latest-offset-report-sometimes-frozen

Thanks
- Stuart

Re: Latest offset is frozen

Posted by Stuart Reynolds <st...@stureynolds.com>.
Doh! Was assuming there was only 1 partition... Need to read all the partitions.

On Mon, Feb 23, 2015 at 3:21 PM, Jun Rao <ju...@confluent.io> wrote:
> Hmm, when the tail offset is frozen, does it freeze forever? Also, do you
> get the same frozen offset if you run the GetOffsetShell command?
>
> Thanks,
>
> Jun
>
> On Sun, Feb 22, 2015 at 6:45 PM, Stuart Reynolds <st...@stureynolds.com>
> wrote:
>
>> I'm finding that if I continuously produce values to a topic (say,
>> once every 2 seconds), and in another thread, query the head and tail
>> offsets of a topic, then sometimes I see the head offset increasing,
>> sometimes its frozen. What's up with that?
>>
>> I'm using scala client: 0.8.2 and server: 2.9.2-0.8.1.1
>>
>> I querying the head and tail offsets like this:
>>
>>   private def getOffset(consumer: SimpleConsumer, topic: String,
>> partition: Int, whichTime: Long): Long = {
>>     val topicAndPartition = new TopicAndPartition(topic, partition);
>>     val response = consumer.earliestOrLatestOffset(
>>        topicAndPartition,
>>        earliestOrLatest = whichTime,
>>        consumerId= 0);
>>     return response;
>>   }
>>
>>   case class HeadAndTailOffsets(head: Long, tail: Long)
>>
>>   def getHeadAndTailOffsets(consumer: SimpleConsumer, topic: String,
>> partition: Int = 0): HeadAndTailOffsets =
>>     HeadAndTailOffsets(
>>       head = getOffset(consumer, topic, partition,
>> kafka.api.OffsetRequest.EarliestTime),
>>       tail = getOffset(consumer, topic, partition,
>> kafka.api.OffsetRequest.LatestTime))
>>
>> -----
>> If I run a producer, consumer, and offset reporter threads. On the
>> first run I might get something like this:
>>
>>
>> -----
>> producer           consumer             offsets
>>                    offset,message           head,tail
>>   "MSG-0"           0, "MSG-0"              0,1
>>   "MSG-1"           1, "MSG-1"              0,2
>>   "MSG-2"           2, "MSG-2"              0,3
>>     ...              ...                    ...
>> -----
>> On subsequent runs, I might see something like this:
>> -----
>>      producer          consumer               offsets
>>                     offset,message          head,tail
>>       "MSG-0"          10, "MSG-0”             0,21 ** tail is frozen
>>       "MSG-1"          11, "MSG-1"             0,21
>>       "MSG-2"          12, "MSG-2”             0,21 ** lies, damn lies
>>        ..
>>       "MSG-31"         31,"MSG-21"             0,21
>>       "MSG-32"         31,"MSG-22"             0,21
>>         ...              ...                    ...
>> -----
>> i.e. the consumer sees increasing offsets with the received messages,
>> but the thread reporting the topic's head and tail offsets is frozen.
>>
>> Is this a client bug or an issue with my usage?
>>
>> I have a fuller code sample here:
>>
>> http://stackoverflow.com/questions/28663714/why-is-kafkas-latest-offset-report-sometimes-frozen
>>
>> Thanks
>> - Stuart
>>

Re: Latest offset is frozen

Posted by Jun Rao <ju...@confluent.io>.
Hmm, when the tail offset is frozen, does it freeze forever? Also, do you
get the same frozen offset if you run the GetOffsetShell command?

Thanks,

Jun

On Sun, Feb 22, 2015 at 6:45 PM, Stuart Reynolds <st...@stureynolds.com>
wrote:

> I'm finding that if I continuously produce values to a topic (say,
> once every 2 seconds), and in another thread, query the head and tail
> offsets of a topic, then sometimes I see the head offset increasing,
> sometimes its frozen. What's up with that?
>
> I'm using scala client: 0.8.2 and server: 2.9.2-0.8.1.1
>
> I querying the head and tail offsets like this:
>
>   private def getOffset(consumer: SimpleConsumer, topic: String,
> partition: Int, whichTime: Long): Long = {
>     val topicAndPartition = new TopicAndPartition(topic, partition);
>     val response = consumer.earliestOrLatestOffset(
>        topicAndPartition,
>        earliestOrLatest = whichTime,
>        consumerId= 0);
>     return response;
>   }
>
>   case class HeadAndTailOffsets(head: Long, tail: Long)
>
>   def getHeadAndTailOffsets(consumer: SimpleConsumer, topic: String,
> partition: Int = 0): HeadAndTailOffsets =
>     HeadAndTailOffsets(
>       head = getOffset(consumer, topic, partition,
> kafka.api.OffsetRequest.EarliestTime),
>       tail = getOffset(consumer, topic, partition,
> kafka.api.OffsetRequest.LatestTime))
>
> -----
> If I run a producer, consumer, and offset reporter threads. On the
> first run I might get something like this:
>
>
> -----
> producer           consumer             offsets
>                    offset,message           head,tail
>   "MSG-0"           0, "MSG-0"              0,1
>   "MSG-1"           1, "MSG-1"              0,2
>   "MSG-2"           2, "MSG-2"              0,3
>     ...              ...                    ...
> -----
> On subsequent runs, I might see something like this:
> -----
>      producer          consumer               offsets
>                     offset,message          head,tail
>       "MSG-0"          10, "MSG-0”             0,21 ** tail is frozen
>       "MSG-1"          11, "MSG-1"             0,21
>       "MSG-2"          12, "MSG-2”             0,21 ** lies, damn lies
>        ..
>       "MSG-31"         31,"MSG-21"             0,21
>       "MSG-32"         31,"MSG-22"             0,21
>         ...              ...                    ...
> -----
> i.e. the consumer sees increasing offsets with the received messages,
> but the thread reporting the topic's head and tail offsets is frozen.
>
> Is this a client bug or an issue with my usage?
>
> I have a fuller code sample here:
>
> http://stackoverflow.com/questions/28663714/why-is-kafkas-latest-offset-report-sometimes-frozen
>
> Thanks
> - Stuart
>