You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by Attila Simon <sa...@cloudera.com> on 2016/10/13 09:45:46 UTC

Re: how to make KafkaSource consume the existing messages

for the records cc dev@

On Thu, Oct 13, 2016 at 11:43 AM, Attila Simon <sa...@cloudera.com> wrote:
> Hi,
>
> auto.offset.reset aim to handle failure scenarios when Flume lost the
> track of offsets. When Flume is able to successfully consume the
> messages it also commits the last processed offset. When failure
> happens and <earliest> was set resetting offset would use the last
> committed value.
> I don't think that always starting from "zero" offset would be
> valuable (would result a lot of duplicates). So I assume you would
> like to have a recovery scenario. What you can do is setting the
> consumer group.id to something new so if kafka still has the messages
> - you can check that with command line kafka consumer setting the
> --from-beginning argument as kafka by default purges them periodically
> - then flume would reset the offset to the effective beginning since
> offsets are stored per group.id.
>
> Quoted from Kafka docs
> (http://kafka.apache.org/documentation#newconsumerconfigs):
> auto.offset.reset - What to do when there is no initial offset in
> Kafka or if the current offset does not exist any more on the server
> (e.g. because that data has been deleted):
>
> earliest: automatically reset the offset to the earliest offset
> latest: automatically reset the offset to the latest offset
> none: throw exception to the consumer if no previous offset is found
> for the consumer's group
> anything else: throw exception to the consumer.
>
> Cheers,
> Attila
>
>
> On Thu, Oct 13, 2016 at 10:00 AM, Ping PW Wang <wp...@cn.ibm.com> wrote:
>> Hi,
>> I used KafkaSource to consume the messages from Kafka. I found only new
>> messages were received while the old existing message not. I tried to use a
>> new consumer group and update the parameter "auto.offset.reset = latest" to
>> "earliest", but this does not work.
>>
>> tier2.sources.source1.kafka.consumer.group.id = test-consumer-group-new
>> tier2.sources.source1.kafka.consumer.auto.offset.reset = earliest
>>
>> Anyone knows how to make KafkaSource consume the existing messages?
>> Thanks a lot for any advice!
>>
>> Best Regards,
>>
>> Wang Ping (王苹)
>> InfoSphere BigInsights, CDL
>> Email: wpwang@cn.ibm.com Phone: (8610)82453448 Mobile: (86)17090815725
>> Address: Ring Bldg.No.28 Building,ZhongGuanCun Software Park,No.8 Dong Bei
>> Wang West Road, Haidian District Beijing P.R.China 100193
>> 地址:北京市海淀区东北旺西路8号,中关村软件园28号楼 邮编:100193
>>

Re: how to make KafkaSource consume the existing messages

Posted by Attila Simon <sa...@cloudera.com>.
Hi,

One more thing. If you switch to the new group.id and would like to
maintain the read from beginning behaviour every time flume restart
then you might try setting enable.auto.commit to false.
Again Kafka normally won't store the events indefinitely.

Cheers,
Attila


On Thu, Oct 13, 2016 at 11:45 AM, Attila Simon <sa...@cloudera.com> wrote:
> for the records cc dev@
>
> On Thu, Oct 13, 2016 at 11:43 AM, Attila Simon <sa...@cloudera.com> wrote:
>> Hi,
>>
>> auto.offset.reset aim to handle failure scenarios when Flume lost the
>> track of offsets. When Flume is able to successfully consume the
>> messages it also commits the last processed offset. When failure
>> happens and <earliest> was set resetting offset would use the last
>> committed value.
>> I don't think that always starting from "zero" offset would be
>> valuable (would result a lot of duplicates). So I assume you would
>> like to have a recovery scenario. What you can do is setting the
>> consumer group.id to something new so if kafka still has the messages
>> - you can check that with command line kafka consumer setting the
>> --from-beginning argument as kafka by default purges them periodically
>> - then flume would reset the offset to the effective beginning since
>> offsets are stored per group.id.
>>
>> Quoted from Kafka docs
>> (http://kafka.apache.org/documentation#newconsumerconfigs):
>> auto.offset.reset - What to do when there is no initial offset in
>> Kafka or if the current offset does not exist any more on the server
>> (e.g. because that data has been deleted):
>>
>> earliest: automatically reset the offset to the earliest offset
>> latest: automatically reset the offset to the latest offset
>> none: throw exception to the consumer if no previous offset is found
>> for the consumer's group
>> anything else: throw exception to the consumer.
>>
>> Cheers,
>> Attila
>>
>>
>> On Thu, Oct 13, 2016 at 10:00 AM, Ping PW Wang <wp...@cn.ibm.com> wrote:
>>> Hi,
>>> I used KafkaSource to consume the messages from Kafka. I found only new
>>> messages were received while the old existing message not. I tried to use a
>>> new consumer group and update the parameter "auto.offset.reset = latest" to
>>> "earliest", but this does not work.
>>>
>>> tier2.sources.source1.kafka.consumer.group.id = test-consumer-group-new
>>> tier2.sources.source1.kafka.consumer.auto.offset.reset = earliest
>>>
>>> Anyone knows how to make KafkaSource consume the existing messages?
>>> Thanks a lot for any advice!
>>>
>>> Best Regards,
>>>
>>> Wang Ping (王苹)
>>> InfoSphere BigInsights, CDL
>>> Email: wpwang@cn.ibm.com Phone: (8610)82453448 Mobile: (86)17090815725
>>> Address: Ring Bldg.No.28 Building,ZhongGuanCun Software Park,No.8 Dong Bei
>>> Wang West Road, Haidian District Beijing P.R.China 100193
>>> 地址:北京市海淀区东北旺西路8号,中关村软件园28号楼 邮编:100193
>>>

Re: how to make KafkaSource consume the existing messages

Posted by Attila Simon <sa...@cloudera.com>.
Hi,

One more thing. If you switch to the new group.id and would like to
maintain the read from beginning behaviour every time flume restart
then you might try setting enable.auto.commit to false.
Again Kafka normally won't store the events indefinitely.

Cheers,
Attila


On Thu, Oct 13, 2016 at 11:45 AM, Attila Simon <sa...@cloudera.com> wrote:
> for the records cc dev@
>
> On Thu, Oct 13, 2016 at 11:43 AM, Attila Simon <sa...@cloudera.com> wrote:
>> Hi,
>>
>> auto.offset.reset aim to handle failure scenarios when Flume lost the
>> track of offsets. When Flume is able to successfully consume the
>> messages it also commits the last processed offset. When failure
>> happens and <earliest> was set resetting offset would use the last
>> committed value.
>> I don't think that always starting from "zero" offset would be
>> valuable (would result a lot of duplicates). So I assume you would
>> like to have a recovery scenario. What you can do is setting the
>> consumer group.id to something new so if kafka still has the messages
>> - you can check that with command line kafka consumer setting the
>> --from-beginning argument as kafka by default purges them periodically
>> - then flume would reset the offset to the effective beginning since
>> offsets are stored per group.id.
>>
>> Quoted from Kafka docs
>> (http://kafka.apache.org/documentation#newconsumerconfigs):
>> auto.offset.reset - What to do when there is no initial offset in
>> Kafka or if the current offset does not exist any more on the server
>> (e.g. because that data has been deleted):
>>
>> earliest: automatically reset the offset to the earliest offset
>> latest: automatically reset the offset to the latest offset
>> none: throw exception to the consumer if no previous offset is found
>> for the consumer's group
>> anything else: throw exception to the consumer.
>>
>> Cheers,
>> Attila
>>
>>
>> On Thu, Oct 13, 2016 at 10:00 AM, Ping PW Wang <wp...@cn.ibm.com> wrote:
>>> Hi,
>>> I used KafkaSource to consume the messages from Kafka. I found only new
>>> messages were received while the old existing message not. I tried to use a
>>> new consumer group and update the parameter "auto.offset.reset = latest" to
>>> "earliest", but this does not work.
>>>
>>> tier2.sources.source1.kafka.consumer.group.id = test-consumer-group-new
>>> tier2.sources.source1.kafka.consumer.auto.offset.reset = earliest
>>>
>>> Anyone knows how to make KafkaSource consume the existing messages?
>>> Thanks a lot for any advice!
>>>
>>> Best Regards,
>>>
>>> Wang Ping (王苹)
>>> InfoSphere BigInsights, CDL
>>> Email: wpwang@cn.ibm.com Phone: (8610)82453448 Mobile: (86)17090815725
>>> Address: Ring Bldg.No.28 Building,ZhongGuanCun Software Park,No.8 Dong Bei
>>> Wang West Road, Haidian District Beijing P.R.China 100193
>>> 地址:北京市海淀区东北旺西路8号,中关村软件园28号楼 邮编:100193
>>>