You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Chitra Raveendran <ch...@flutura.com> on 2014/03/06 11:04:37 UTC

Re: How to consume from last offset when topology restarts(STORM-KAFKA)

Hi

Sorry for the late reply, I just got time to experiment today and,
realized forceStartOffsetTime
is not accepting timestamp(milli seconds) value as a parameter.

This doesn't seem to work. I'm using the kafka spout from storm-contrib,
and it is a normal storm topology not a trident topology!!

Regards
Chitra


On Mon, Feb 17, 2014 at 1:26 AM, Chitra Raveendran <
chitra.raveendran@flutura.com> wrote:

> Yes I tried without the forced offset time parameter, but the topology
> stopped consuming messages.
>
> Regards
> Chitra
>  On Feb 17, 2014 1:24 AM, "P. Taylor Goetz" <pt...@gmail.com> wrote:
>
>> If turn off forceOffsetTime, it should resume from the last offset stored
>> in zookeeper.
>>
>> - Taylor
>>
>> On Feb 16, 2014, at 12:35 PM, Chitra Raveendran <
>> chitra.raveendran@flutura.com> wrote:
>>
>> Hi
>>
>> So according to this logic I should set the timestamp parameter to the
>> value  when the topology was stopped ?
>>
>> But how do we identify the exact instance when the topology went down, so
>> that storm could start consuming from then ? Is it based on approximation ?
>> Or is there some concrete way to find the exact instance when the topology
>> was down?
>>
>> Is there any other parameter which is based on last offset and not time?
>>
>> Regards
>> Chitra
>> On Feb 16, 2014 11:00 PM, "Vinoth Kumar Kannan" <vi...@gmail.com>
>> wrote:
>>
>>> forceStartOffsetTime value can be -2, -1, or a time stamp in
>>> milliseconds
>>>
>>>
>>>    - -1 to read the latest offset of the topic
>>>    - -2 to read from the beginning.
>>>    - timestamp to read from a specific time
>>>
>>>
>>>
>>> On Sun, Feb 16, 2014 at 6:15 PM, Chitra Raveendran <
>>> chitra.raveendran@flutura.com> wrote:
>>>
>>>> Any body ?? Any answers !!!
>>>>
>>>> I'm sure someone would have a work around !
>>>>
>>>> Please help :)
>>>> On Feb 15, 2014 12:11 AM, "Andrey Yegorov" <an...@gmail.com>
>>>> wrote:
>>>>
>>>>> I have exactly the same question.
>>>>> I am using kafka spout from
>>>>> https://github.com/wurstmeister/storm-kafka-0.8-plus.git with kafka
>>>>> 0.8 release and ordinary (non-trident) storm topology.
>>>>>
>>>>> How can I guarantee processing of messages sent while topology was
>>>>> down or while e.g. storm cluster was down for maintenance?
>>>>>
>>>>> ----------
>>>>> Andrey Yegorov
>>>>>
>>>>>
>>>>> On Wed, Feb 12, 2014 at 8:05 AM, Danijel Schiavuzzi <
>>>>> dschiavu@gmail.com> wrote:
>>>>>
>>>>>> Hi Chitra,
>>>>>>
>>>>>> Which Kafka spout version are you exactly using, and what spout type
>>>>>> -- Trident or the ordinary Storm spout?
>>>>>>
>>>>>> I ask that because, unfortunately, there are multiple Kafka spout
>>>>>> versions around the web. According to my research, your best bet is the one
>>>>>> in storm-contrib in case you use Kafka version 0.7, and
>>>>>> storm-kafka-0.8-plus in case you use Kafka 0.8.
>>>>>>
>>>>>> Best regards,
>>>>>>
>>>>>> Danijel Schiavuzzi
>>>>>> www.schiavuzzi.com
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 12, 2014 at 8:42 AM, Chitra Raveendran <
>>>>>> chitra.raveendran@flutura.com> wrote:
>>>>>>
>>>>>>> Hi
>>>>>>>
>>>>>>> I have a topology in production which uses the default kafka spout,
>>>>>>> I have set this parameter
>>>>>>> *spoutConfig.forceStartOffsetTime(-1);*
>>>>>>>
>>>>>>> This parameter -1 helps me in such a way that, it consumes from the
>>>>>>> latest message, and doesn't start reading data from kafka right from the
>>>>>>> beginning (That would be unnecessary and redundant in my usecase).
>>>>>>>
>>>>>>> But in production, whenever a new release goes in, I stop and start
>>>>>>> the topology which would take a few seconds to minutes. I have been loosing
>>>>>>> out on some data during the time that the topology is down.
>>>>>>>
>>>>>>> How can I avoid this. I have tried running without the
>>>>>>> ForcedOffsetTime parameter, but that did not work. What am I doing wrong,
>>>>>>> how can I continue reading from the last offest ?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Chitra
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Danijel Schiavuzzi
>>>>>>
>>>>>
>>>>>
>>>
>>>
>>> --
>>> With Regards,
>>> Vinoth Kumar K
>>>
>>
>>

Re: How to consume from last offset when topology restarts(STORM-KAFKA)

Posted by Chitra Raveendran <ch...@flutura.com>.
Just adding on to my observation ,  forceStartOffsetTime accepts a
timestamp value in milliseconds, but it seems to be working just like how
it would have worked if I passed parameter -1. It is reading only from
current offset and not from the input timestamp!

I guess the kafka spout I'm using only handles 2 conditions as input,ie.
either start from beginning or start from next offset!
It would be great if someone could help

Thanks
Chitra


On Thu, Mar 6, 2014 at 3:34 PM, Chitra Raveendran <
chitra.raveendran@flutura.com> wrote:

> Hi
>
> Sorry for the late reply, I just got time to experiment today and,
> realized forceStartOffsetTime is not accepting timestamp(milli seconds)
> value as a parameter.
>
> This doesn't seem to work. I'm using the kafka spout from storm-contrib,
> and it is a normal storm topology not a trident topology!!
>
> Regards
> Chitra
>
>
> On Mon, Feb 17, 2014 at 1:26 AM, Chitra Raveendran <
> chitra.raveendran@flutura.com> wrote:
>
>> Yes I tried without the forced offset time parameter, but the topology
>> stopped consuming messages.
>>
>> Regards
>> Chitra
>>  On Feb 17, 2014 1:24 AM, "P. Taylor Goetz" <pt...@gmail.com> wrote:
>>
>>> If turn off forceOffsetTime, it should resume from the last offset
>>> stored in zookeeper.
>>>
>>> - Taylor
>>>
>>> On Feb 16, 2014, at 12:35 PM, Chitra Raveendran <
>>> chitra.raveendran@flutura.com> wrote:
>>>
>>> Hi
>>>
>>> So according to this logic I should set the timestamp parameter to the
>>> value  when the topology was stopped ?
>>>
>>> But how do we identify the exact instance when the topology went down,
>>> so that storm could start consuming from then ? Is it based on
>>> approximation ? Or is there some concrete way to find the exact instance
>>> when the topology was down?
>>>
>>> Is there any other parameter which is based on last offset and not time?
>>>
>>> Regards
>>> Chitra
>>> On Feb 16, 2014 11:00 PM, "Vinoth Kumar Kannan" <vi...@gmail.com>
>>> wrote:
>>>
>>>> forceStartOffsetTime value can be -2, -1, or a time stamp in
>>>> milliseconds
>>>>
>>>>
>>>>    - -1 to read the latest offset of the topic
>>>>    - -2 to read from the beginning.
>>>>    - timestamp to read from a specific time
>>>>
>>>>
>>>>
>>>> On Sun, Feb 16, 2014 at 6:15 PM, Chitra Raveendran <
>>>> chitra.raveendran@flutura.com> wrote:
>>>>
>>>>> Any body ?? Any answers !!!
>>>>>
>>>>> I'm sure someone would have a work around !
>>>>>
>>>>> Please help :)
>>>>> On Feb 15, 2014 12:11 AM, "Andrey Yegorov" <an...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I have exactly the same question.
>>>>>> I am using kafka spout from
>>>>>> https://github.com/wurstmeister/storm-kafka-0.8-plus.git with kafka
>>>>>> 0.8 release and ordinary (non-trident) storm topology.
>>>>>>
>>>>>> How can I guarantee processing of messages sent while topology was
>>>>>> down or while e.g. storm cluster was down for maintenance?
>>>>>>
>>>>>> ----------
>>>>>> Andrey Yegorov
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 12, 2014 at 8:05 AM, Danijel Schiavuzzi <
>>>>>> dschiavu@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Chitra,
>>>>>>>
>>>>>>> Which Kafka spout version are you exactly using, and what spout type
>>>>>>> -- Trident or the ordinary Storm spout?
>>>>>>>
>>>>>>> I ask that because, unfortunately, there are multiple Kafka spout
>>>>>>> versions around the web. According to my research, your best bet is the one
>>>>>>> in storm-contrib in case you use Kafka version 0.7, and
>>>>>>> storm-kafka-0.8-plus in case you use Kafka 0.8.
>>>>>>>
>>>>>>> Best regards,
>>>>>>>
>>>>>>> Danijel Schiavuzzi
>>>>>>> www.schiavuzzi.com
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Feb 12, 2014 at 8:42 AM, Chitra Raveendran <
>>>>>>> chitra.raveendran@flutura.com> wrote:
>>>>>>>
>>>>>>>> Hi
>>>>>>>>
>>>>>>>> I have a topology in production which uses the default kafka spout,
>>>>>>>> I have set this parameter
>>>>>>>> *spoutConfig.forceStartOffsetTime(-1);*
>>>>>>>>
>>>>>>>> This parameter -1 helps me in such a way that, it consumes from the
>>>>>>>> latest message, and doesn't start reading data from kafka right from the
>>>>>>>> beginning (That would be unnecessary and redundant in my usecase).
>>>>>>>>
>>>>>>>> But in production, whenever a new release goes in, I stop and start
>>>>>>>> the topology which would take a few seconds to minutes. I have been loosing
>>>>>>>> out on some data during the time that the topology is down.
>>>>>>>>
>>>>>>>> How can I avoid this. I have tried running without the
>>>>>>>> ForcedOffsetTime parameter, but that did not work. What am I doing wrong,
>>>>>>>> how can I continue reading from the last offest ?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Chitra
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Danijel Schiavuzzi
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>>
>>>> --
>>>> With Regards,
>>>> Vinoth Kumar K
>>>>
>>>
>>>
>
>
>


-- 

Regards,

*Chitra Raveendran*
*Data Scientist*
Mobile: +91 819753660│*Email:* chitra.raveendran@flutura.com
*Flutura Business Solutions Private Limited – “A Decision Sciences &
Analytics Company”*│ #693, 2nd Floor, Geetanjali, 15th Cross, J.P
Nagar 2nd Phase,
Bangalore – 560078│