You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Tousif <to...@gmail.com> on 2015/03/09 10:57:02 UTC

Reading from last processed message offset (kafka spout)

Hello,

I'm trying to read message from kafka which were not processed when
topology was offline and restarted after a while.

I tried following config

SpoutConfig spoutConfig = new SpoutConfig(hosts,
PropertyManager.getProperty("kafka.spout.topic").toString(), "/" +
PropertyManager.getProperty("kafka.spout.topic").toString(),
UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
*spoutConfig.forceFromStart = true; *
*spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();*


and

*spoutConfig.forceFromStart = false; *
*spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();*


None of them give me messages which were not read while topology was
offline.

Any help?


-- 


Regards
khazi

Re: Reading from last processed message offset (kafka spout)

Posted by Tousif <to...@gmail.com>.
Thank you very much, After  i created a static path for ZKRoot and
spoutConfig.forceFromStart
set to false   now reading last  offset and consuming unread messages.

On Tue, Mar 10, 2015 at 11:53 AM, Harsha <st...@harsha.io> wrote:

> Yes thats bad approach . Mostly users keep a static string for the "id"
> part in the spoutConfig. Whats the need to use randomUUID.
>
> --
> Harsha
>
>
> On March 9, 2015 at 11:09:46 PM, Tousif (tousif.pasha@gmail.com) wrote:
>
> Thanks Harsha,
>
>  Does zkRoot in the spoutconfig is used along with random string to
> create path in Zookeeper?
>
> SpoutConfig(hosts, PropertyManager.getProperty("
> kafka.spout.topic").toString(), "/kafkastormroot",
> *UUID.randomUUID().toString()*);
>
>  zkroot  is prepended to the random id we give to create the unique place
> in ZK  where consumption state is stored.  If this is true than every time
> we deploy our topology it will create a unique path ?
>
> How will it get previous offset?
>
>
> On Mon, Mar 9, 2015 at 8:36 PM, Harsha <st...@harsha.io> wrote:
>
>>   If your topology has saved Kafka offset in your zookeeper it will
>> start processing from that otherwise It checks spoutConfig.forceFromStart
>> set to true in this case it will try to fetch data from the beginning of
>> the queue i.e kafka.api.EarliestTime() . If none of the above matches it
>> will pick users spoutConfig.startOffsetTime.
>>
>>  "kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data
>> in the logs and starts streaming from there,
>> kafka.api.OffsetRequest.LatestTime() will only stream new messages."
>>
>> If you stopping and re-deploying the topology make sure you used the same
>> name as KafkaSpout uses topology name to store and retrieve the offsets
>> from zookeeper.
>>
>>  --
>> Harsha
>>
>>
>>
>> On March 9, 2015 at 7:30:38 AM, Tousif (tousif.pasha@gmail.com) wrote:
>>
>>  If your topology has saved Kafka offset in your zookeeper it will start
>> processing from that otherwise It checks spoutConfig.forceFromStart set to
>> true in this case it will try to fetch data from the beginning of the queue
>> i.e kafka.api.EarliestTime() . If none of the above matches it will pick
>> users spoutConfig.startOffsetTime.
>>
>>  "kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data
>> in the logs and starts streaming from there,
>> kafka.api.OffsetRequest.LatestTime() will only stream new messages."
>>
>>  If you stopping and re-deploying the topology make sure you used the
>> same name as KafkaSpout uses topology name to store and retrieve the
>> offsets from zookeeper.
>>
>>  --
>> Harsha
>>
>>
>
>
> --
>
>
> Regards
> Tousif Khazi
>
>


-- 


Regards
Tousif Khazi

Re: Reading from last processed message offset (kafka spout)

Posted by Harsha <st...@harsha.io>.
Yes thats bad approach . Mostly users keep a static string for the “id” part in the spoutConfig. Whats the need to use randomUUID.

-- 
Harsha


On March 9, 2015 at 11:09:46 PM, Tousif (tousif.pasha@gmail.com) wrote:

Thanks Harsha,

Does zkRoot in the spoutconfig is used along with random string to create path in Zookeeper?

SpoutConfig(hosts, PropertyManager.getProperty("kafka.spout.topic").toString(), "/kafkastormroot", UUID.randomUUID().toString());

 zkroot  is prepended to the random id we give to create the unique place in ZK  where consumption state is stored.  If this is true than every time we deploy our topology it will create a unique path ? 

How will it get previous offset?


On Mon, Mar 9, 2015 at 8:36 PM, Harsha <st...@harsha.io> wrote:
If your topology has saved Kafka offset in your zookeeper it will start processing from that otherwise It checks spoutConfig.forceFromStart set to true in this case it will try to fetch data from the beginning of the queue i.e kafka.api.EarliestTime() . If none of the above matches it will pick users spoutConfig.startOffsetTime.

"kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data in the logs and starts streaming from there, kafka.api.OffsetRequest.LatestTime() will only stream new messages.”

If you stopping and re-deploying the topology make sure you used the same name as KafkaSpout uses topology name to store and retrieve the offsets from zookeeper.

-- 
Harsha



On March 9, 2015 at 7:30:38 AM, Tousif (tousif.pasha@gmail.com) wrote:

If your topology has saved Kafka offset in your zookeeper it will start processing from that otherwise It checks spoutConfig.forceFromStart set to true in this case it will try to fetch data from the beginning of the queue i.e kafka.api.EarliestTime() . If none of the above matches it will pick users spoutConfig.startOffsetTime.

"kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data in the logs and starts streaming from there, kafka.api.OffsetRequest.LatestTime() will only stream new messages.”

If you stopping and re-deploying the topology make sure you used the same name as KafkaSpout uses topology name to store and retrieve the offsets from zookeeper.

-- 
Harsha



--


Regards
Tousif Khazi


Re: Reading from last processed message offset (kafka spout)

Posted by Tousif <to...@gmail.com>.
Thanks Harsha,

Does zkRoot in the spoutconfig is used along with random string to create
path in Zookeeper?

SpoutConfig(hosts, PropertyManager.getProperty("
kafka.spout.topic").toString(), "/kafkastormroot",
*UUID.randomUUID().toString()*);

 zkroot  is prepended to the random id we give to create the unique place
in ZK  where consumption state is stored.  If this is true than every time
we deploy our topology it will create a unique path ?

How will it get previous offset?


On Mon, Mar 9, 2015 at 8:36 PM, Harsha <st...@harsha.io> wrote:

> If your topology has saved Kafka offset in your zookeeper it will start
> processing from that otherwise It checks spoutConfig.forceFromStart set to
> true in this case it will try to fetch data from the beginning of the queue
> i.e kafka.api.EarliestTime() . If none of the above matches it will pick
> users spoutConfig.startOffsetTime.
>
> "kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data in
> the logs and starts streaming from there,
> kafka.api.OffsetRequest.LatestTime() will only stream new messages."
>
> If you stopping and re-deploying the topology make sure you used the same
> name as KafkaSpout uses topology name to store and retrieve the offsets
> from zookeeper.
>
> --
> Harsha
>
>
>
> On March 9, 2015 at 7:30:38 AM, Tousif (tousif.pasha@gmail.com) wrote:
>
> If your topology has saved Kafka offset in your zookeeper it will start
> processing from that otherwise It checks spoutConfig.forceFromStart set to
> true in this case it will try to fetch data from the beginning of the queue
> i.e kafka.api.EarliestTime() . If none of the above matches it will pick
> users spoutConfig.startOffsetTime.
>
> "kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data in
> the logs and starts streaming from there,
> kafka.api.OffsetRequest.LatestTime() will only stream new messages."
>
> If you stopping and re-deploying the topology make sure you used the same
> name as KafkaSpout uses topology name to store and retrieve the offsets
> from zookeeper.
>
> --
> Harsha
>
>


-- 


Regards
Tousif Khazi

Re: Reading from last processed message offset (kafka spout)

Posted by Harsha <st...@harsha.io>.
If your topology has saved Kafka offset in your zookeeper it will start processing from that otherwise It checks spoutConfig.forceFromStart set to true in this case it will try to fetch data from the beginning of the queue i.e kafka.api.EarliestTime() . If none of the above matches it will pick users spoutConfig.startOffsetTime.

"kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data in the logs and starts streaming from there, kafka.api.OffsetRequest.LatestTime() will only stream new messages.”

If you stopping and re-deploying the topology make sure you used the same name as KafkaSpout uses topology name to store and retrieve the offsets from zookeeper.

-- 
Harsha



On March 9, 2015 at 7:30:38 AM, Tousif (tousif.pasha@gmail.com) wrote:

If your topology has saved Kafka offset in your zookeeper it will start processing from that otherwise It checks spoutConfig.forceFromStart set to true in this case it will try to fetch data from the beginning of the queue i.e kafka.api.EarliestTime() . If none of the above matches it will pick users spoutConfig.startOffsetTime.

"kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data in the logs and starts streaming from there, kafka.api.OffsetRequest.LatestTime() will only stream new messages.”

If you stopping and re-deploying the topology make sure you used the same name as KafkaSpout uses topology name to store and retrieve the offsets from zookeeper.

-- 
Harsha

Re: Reading from last processed message offset (kafka spout)

Posted by Tousif <to...@gmail.com>.
Harsha,

Its Distributed.  what is the significance of
*kafka.api.OffsetRequest.LatestTime()
 *wrt reading from where storm left last time.


On Mon, Mar 9, 2015 at 7:42 PM, Harsha <st...@harsha.io> wrote:

>  Tousif,
>        How did you deployed the topology . Is this a distributed storm
> cluster or LocalCluster. LocalCluster is a simulated storm cluster which
> should only be used for dev purposes when you are writing your topology and
> you want test it out.
> LocalCluster has in-process zookeeper when you shutdown the this cluster
> it will delete the contents of the zookeeper hence your kafka topology
> offset won't be available when next time you restart. As I said above this
> should only used for testing your topology.
>
>         If you are using storm distributed cluster . Submit topology with
> spoutConfig.froceFromStart=true for the first time if you want to read from
> the beginning of the queue. For the subsequent times when you redeploy the
> topology make sure you set spoutConfig.forceFromStart=false so that your
> topology picks up the kafka offset from zookeeper and starts where its left
> off.
>
> -Harsha
> On Mon, Mar 9, 2015, at 02:59 AM, Tousif wrote:
>
> Since local cluster has in process zoookeper so I tried it in a
> distributed cluster. But was not able to get those messages
>
> On Mon, Mar 9, 2015 at 3:27 PM, Tousif <to...@gmail.com> wrote:
>
> Hello,
>
> I'm trying to read message from kafka which were not processed when
> topology was offline and restarted after a while.
>
> I tried following config
>
> SpoutConfig spoutConfig = new SpoutConfig(hosts,
> PropertyManager.getProperty("kafka.spout.topic").toString(), "/" +
> PropertyManager.getProperty("kafka.spout.topic").toString(),
> UUID.randomUUID().toString());
> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>  *spoutConfig.forceFromStart = true; *
> *spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();*
>
>
> and
>
> *spoutConfig.forceFromStart = false; *
> *spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();*
>
>
> None of them give me messages which were not read while topology was
> offline.
>
> Any help?
>
>
>
> --
>
>
>
> Regards
> khazi
>
>
>
>
>
>
>
> --
>
>
> Regards
> Tousif Khazi
>
>
>
>



-- 


Regards
Tousif Khazi

Re: Reading from last processed message offset (kafka spout)

Posted by Harsha <st...@harsha.io>.
Tousif, How did you deployed the topology . Is this a distributed storm
cluster or LocalCluster. LocalCluster is a simulated storm cluster
which should only be used for dev purposes when you are writing your
topology and you want test it out. LocalCluster has in-process
zookeeper when you shutdown the this cluster it will delete the
contents of the zookeeper hence your kafka topology offset won't be
available when next time you restart. As I said above this should only
used for testing your topology.

If you are using storm distributed cluster . Submit topology with
spoutConfig.froceFromStart=true for the first time if you want to read
from the beginning of the queue. For the subsequent times when you
redeploy the topology make sure you set spoutConfig.forceFromStart=false
so that your topology picks up the kafka offset from zookeeper and
starts where its left off.

-Harsha On Mon, Mar 9, 2015, at 02:59 AM, Tousif wrote:
> Since local cluster has in process zoookeper so I tried it in a
> distributed cluster. But was not able to get those messages
>
> On Mon, Mar 9, 2015 at 3:27 PM, Tousif <to...@gmail.com> wrote:
>> Hello,
>>
>> I'm trying to read message from kafka which were not processed when
>> topology was offline and restarted after a while.
>>
>> I tried following config
>>
>> SpoutConfig spoutConfig = new SpoutConfig(hosts,
>> PropertyManager.getProperty("kafka.spout.topic").toString(), "/" +
>> PropertyManager.getProperty("kafka.spout.topic").toString(),
>> UUID.randomUUID().toString()); spoutConfig.scheme = new
>> SchemeAsMultiScheme(new StringScheme()); *spoutConfig.forceFromStart
>> = true; * *spoutConfig.startOffsetTime =
>> kafka.api.OffsetRequest.LatestTime();*
>>
>>
>> and
>>
>> *spoutConfig.forceFromStart = false; * *spoutConfig.startOffsetTime =
>> kafka.api.OffsetRequest.LatestTime();*
>>
>>
>> None of them give me messages which were not read while topology was
>> offline.
>>
>> Any help?
>>
>>
>>
>> --
>>
>>
>>
>> Regards khazi
>>
>>
>>
>
>
>
> --
>
>
> Regards Tousif Khazi
>


Re: Reading from last processed message offset (kafka spout)

Posted by Tousif <to...@gmail.com>.
Since local cluster has in process zoookeper so I tried it in a distributed
cluster. But was not able to get those messages

On Mon, Mar 9, 2015 at 3:27 PM, Tousif <to...@gmail.com> wrote:

> Hello,
>
> I'm trying to read message from kafka which were not processed when
> topology was offline and restarted after a while.
>
> I tried following config
>
> SpoutConfig spoutConfig = new SpoutConfig(hosts,
> PropertyManager.getProperty("kafka.spout.topic").toString(), "/" +
> PropertyManager.getProperty("kafka.spout.topic").toString(),
> UUID.randomUUID().toString());
> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> *spoutConfig.forceFromStart = true; *
> *spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();*
>
>
> and
>
> *spoutConfig.forceFromStart = false; *
> *spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();*
>
>
> None of them give me messages which were not read while topology was
> offline.
>
> Any help?
>
>
> --
>
>
> Regards
> khazi
>
>


-- 


Regards
Tousif Khazi