You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ye Hong <Ye...@audiencescience.com> on 2015/09/09 23:17:52 UTC

Resetting consumer offsets after moving to offset.storage=kafka

Hi,

We have a consumer that under certain circumstances may lose data. To guard against such data loss, we have a tool that periodically pulls and stores offsets from zk. Once a data loss takes place, we use our historical offsets to reset the consumer offset on zk.
With offset.storage=zookeeper, the tool just simply calls kafka-run-class.sh kafka.tools.ExportZkOffsets/ImportZkOffsets. However, after moving to offset.storage=kafka, we can no longer call ExportZkOffsets/ImportZkOffsets.
For offset export, I suppose we can call the REST API of Burrow to get the same results. However, I couldn't find an easy way to reset offsets that’s comparable to ImportZkOffsets. Could someone shed some lights on what we should do?

Thanks!

Re: Resetting consumer offsets after moving to offset.storage=kafka

Posted by Ye Hong <Ye...@audiencescience.com>.
Thank you very much, Erik.
Yes, the SimpleConsumer definitely would achieve the goal. It was easier with ImportZkOffset, as it only takes a couple of lines of commands. If no equivalent of ImportZkOffset, I will go with the SimpleConsumer.

As for the data loss, it took place in the downstream processing when data were accidentally deleted. It has nothing to do with Kafka. We just thought it might be a good idea to build tools enabling us to recoup future data loss. 

Best,

Ye

> On Sep 9, 2015, at 2:34 PM, Helleren, Erik <Er...@cmegroup.com> wrote:
> 
> It is possible to commit offsets using the SimpleConsumer API to kafka or
> zookeeper for any GroupID, topic, and partition tuple.  There are some
> difficulties with the SimpleConsumer, but it should be able to make the
> call within your app.  See the scala Doc here:
> http://apache.mirrorcatalogs.com/kafka/0.8.2-beta/scala-doc/index.html#kafk
> a.javaapi.consumer.SimpleConsumer And look for the commitOffsets function.
> 
> 
> I am curious, in what situations are there data loss?
> -Erik  
> 
> 
> On 9/9/15, 4:17 PM, "Ye Hong" <Ye...@audiencescience.com> wrote:
> 
>> Hi,
>> 
>> We have a consumer that under certain circumstances may lose data. To
>> guard against such data loss, we have a tool that periodically pulls and
>> stores offsets from zk. Once a data loss takes place, we use our
>> historical offsets to reset the consumer offset on zk.
>> With offset.storage=zookeeper, the tool just simply calls
>> kafka-run-class.sh kafka.tools.ExportZkOffsets/ImportZkOffsets. However,
>> after moving to offset.storage=kafka, we can no longer call
>> ExportZkOffsets/ImportZkOffsets.
>> For offset export, I suppose we can call the REST API of Burrow to get
>> the same results. However, I couldn't find an easy way to reset offsets
>> that¹s comparable to ImportZkOffsets. Could someone shed some lights on
>> what we should do?
>> 
>> Thanks!
> 


Re: Resetting consumer offsets after moving to offset.storage=kafka

Posted by "Helleren, Erik" <Er...@cmegroup.com>.
It is possible to commit offsets using the SimpleConsumer API to kafka or
zookeeper for any GroupID, topic, and partition tuple.  There are some
difficulties with the SimpleConsumer, but it should be able to make the
call within your app.  See the scala Doc here:
http://apache.mirrorcatalogs.com/kafka/0.8.2-beta/scala-doc/index.html#kafk
a.javaapi.consumer.SimpleConsumer And look for the commitOffsets function.
 

I am curious, in what situations are there data loss?
-Erik  


On 9/9/15, 4:17 PM, "Ye Hong" <Ye...@audiencescience.com> wrote:

>Hi,
>
>We have a consumer that under certain circumstances may lose data. To
>guard against such data loss, we have a tool that periodically pulls and
>stores offsets from zk. Once a data loss takes place, we use our
>historical offsets to reset the consumer offset on zk.
>With offset.storage=zookeeper, the tool just simply calls
>kafka-run-class.sh kafka.tools.ExportZkOffsets/ImportZkOffsets. However,
>after moving to offset.storage=kafka, we can no longer call
>ExportZkOffsets/ImportZkOffsets.
>For offset export, I suppose we can call the REST API of Burrow to get
>the same results. However, I couldn't find an easy way to reset offsets
>that¹s comparable to ImportZkOffsets. Could someone shed some lights on
>what we should do?
>
>Thanks!