You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Manoj Jaiswal <ma...@gmail.com> on 2014/10/20 22:28:42 UTC

storm rebalancing losing data

Hi,

Let me explain my use case in brief:

Kakfa spout A picks up messages from Kakfa topic topic-A and creates
Queries via Esper in Storm bolt B.
This is done only once as soon as topology is deployed
Another Kafka spout C picks up realtime messages from Kafka topic -C which
will be processed by Esper engine in same bolt B.

The spout data from A and B are both partitioned by account numbers so that
the Esper engine in different worker processes gets same account numbers.

Now the problem:
In case of worker threads dying due to some issue or the supervisor node
getting kicked out of the cluster, we are observing that the bolt instance
may get assigned to new process/worker.
But the prepare method of the bolt is initializing Esper query
configuration. so every time the Esper query engine in that worker process
is initialized .Hence it loses the queries setup by one time messages from
Kafka spout A.

Any suggestions, how do we handle this?

-Manoj

Re: storm rebalancing losing data

Posted by Manoj Jaiswal <ma...@gmail.com>.
Thanks Yair.
Its not stateful processing.
But one set of message which flow once an hour define the queries per
account number.
Now the high volume messages flow all the time and they are processed in
the bolt.
So it statefull for the first set of message and stateless for the second
set .

I will go through your suggestion. Only thing is to make a lookup on every
call will be quite expensive.
I think I can set a flag in prepare method which will define if topic-A
messages have been loaded or not.

Thanks again,
--Manoj

On Tue, Oct 21, 2014 at 3:02 PM, Yair Weinberger <ya...@gmail.com> wrote:

> You can save the state per partition (I am not sure what is your partition
> key, but you should be able to use it as a key to a key-value storage).
> Then, when you receive a message from topic C, you should check if you
> have the appropriate state for the partition of this message, and if not
> grab it from the storage. Almost always you should already have the state,
> so it should not cause a performance issue.
>
> The details are somewhat vague, but it sounds like you are trying to do
> stateful processing, which in my opinion is much more native with trident.
>
> On Tue, Oct 21, 2014 at 2:44 PM, Manoj Jaiswal <ma...@gmail.com>
> wrote:
>
>> Thanks Yair,
>>
>> The messages picked up from topic A create the queries. These messages
>> are partitioned and so are the real time messages from topic C.
>> If I persist the state . then how do I get the same partitioned data.
>> The partitioning of data is dynamic and based on worker nodes alive. Isnt
>> it ?
>> So even if we read the state, how to make sure its the same relative
>> state. as in realtime data may be now flowing to another bolt due to
>> partitioning .
>>
>> -Manoj
>>
>>
>>
>> On Tue, Oct 21, 2014 at 2:34 PM, Yair Weinberger <ya...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> It sounds like your bolt actually has a state (Initialized by the
>>> messages picked up from topic A)
>>> When restarting the bolt in case of failover, storm does not provide any
>>> inherent mechanism to keep the Bolt's previous state.
>>>
>>> In my opinion, your best option would be to move to Trident, which
>>> provides the notion of a state. see
>>> https://storm.apache.org/documentation/Trident-state.
>>>
>>> Alternatively, you can use any external storage (e.g. mongo or
>>> memcached) to save your state.
>>> After the processing of the messages from topic A you should write your
>>> state to the external storage.
>>> Then, you can read it in the prepare method. It would be empty in case
>>> the topology was just started, or have the data that was previously written
>>> there if it is a failover restart.
>>>
>>> Take a look at MongoBolt for some ideas (
>>> https://github.com/stormprocessor/storm-mongo/)
>>>
>>>   Yair
>>>   http://www.alooma.io
>>>
>>> On Mon, Oct 20, 2014 at 1:28 PM, Manoj Jaiswal <
>>> manoj.jaiswal21@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Let me explain my use case in brief:
>>>>
>>>> Kakfa spout A picks up messages from Kakfa topic topic-A and creates
>>>> Queries via Esper in Storm bolt B.
>>>> This is done only once as soon as topology is deployed
>>>> Another Kafka spout C picks up realtime messages from Kafka topic -C
>>>> which will be processed by Esper engine in same bolt B.
>>>>
>>>> The spout data from A and B are both partitioned by account numbers so
>>>> that the Esper engine in different worker processes gets same account
>>>> numbers.
>>>>
>>>> Now the problem:
>>>> In case of worker threads dying due to some issue or the supervisor
>>>> node getting kicked out of the cluster, we are observing that the bolt
>>>> instance may get assigned to new process/worker.
>>>> But the prepare method of the bolt is initializing Esper query
>>>> configuration. so every time the Esper query engine in that worker process
>>>> is initialized .Hence it loses the queries setup by one time messages from
>>>> Kafka spout A.
>>>>
>>>> Any suggestions, how do we handle this?
>>>>
>>>> -Manoj
>>>>
>>>
>>>
>>
>

Re: storm rebalancing losing data

Posted by Yair Weinberger <ya...@gmail.com>.
You can save the state per partition (I am not sure what is your partition
key, but you should be able to use it as a key to a key-value storage).
Then, when you receive a message from topic C, you should check if you have
the appropriate state for the partition of this message, and if not grab it
from the storage. Almost always you should already have the state, so it
should not cause a performance issue.

The details are somewhat vague, but it sounds like you are trying to do
stateful processing, which in my opinion is much more native with trident.

On Tue, Oct 21, 2014 at 2:44 PM, Manoj Jaiswal <ma...@gmail.com>
wrote:

> Thanks Yair,
>
> The messages picked up from topic A create the queries. These messages are
> partitioned and so are the real time messages from topic C.
> If I persist the state . then how do I get the same partitioned data.
> The partitioning of data is dynamic and based on worker nodes alive. Isnt
> it ?
> So even if we read the state, how to make sure its the same relative
> state. as in realtime data may be now flowing to another bolt due to
> partitioning .
>
> -Manoj
>
>
>
> On Tue, Oct 21, 2014 at 2:34 PM, Yair Weinberger <ya...@gmail.com>
> wrote:
>
>> Hi,
>> It sounds like your bolt actually has a state (Initialized by the
>> messages picked up from topic A)
>> When restarting the bolt in case of failover, storm does not provide any
>> inherent mechanism to keep the Bolt's previous state.
>>
>> In my opinion, your best option would be to move to Trident, which
>> provides the notion of a state. see
>> https://storm.apache.org/documentation/Trident-state.
>>
>> Alternatively, you can use any external storage (e.g. mongo or memcached)
>> to save your state.
>> After the processing of the messages from topic A you should write your
>> state to the external storage.
>> Then, you can read it in the prepare method. It would be empty in case
>> the topology was just started, or have the data that was previously written
>> there if it is a failover restart.
>>
>> Take a look at MongoBolt for some ideas (
>> https://github.com/stormprocessor/storm-mongo/)
>>
>>   Yair
>>   http://www.alooma.io
>>
>> On Mon, Oct 20, 2014 at 1:28 PM, Manoj Jaiswal <manoj.jaiswal21@gmail.com
>> > wrote:
>>
>>> Hi,
>>>
>>> Let me explain my use case in brief:
>>>
>>> Kakfa spout A picks up messages from Kakfa topic topic-A and creates
>>> Queries via Esper in Storm bolt B.
>>> This is done only once as soon as topology is deployed
>>> Another Kafka spout C picks up realtime messages from Kafka topic -C
>>> which will be processed by Esper engine in same bolt B.
>>>
>>> The spout data from A and B are both partitioned by account numbers so
>>> that the Esper engine in different worker processes gets same account
>>> numbers.
>>>
>>> Now the problem:
>>> In case of worker threads dying due to some issue or the supervisor node
>>> getting kicked out of the cluster, we are observing that the bolt instance
>>> may get assigned to new process/worker.
>>> But the prepare method of the bolt is initializing Esper query
>>> configuration. so every time the Esper query engine in that worker process
>>> is initialized .Hence it loses the queries setup by one time messages from
>>> Kafka spout A.
>>>
>>> Any suggestions, how do we handle this?
>>>
>>> -Manoj
>>>
>>
>>
>

Re: storm rebalancing losing data

Posted by Manoj Jaiswal <ma...@gmail.com>.
Thanks Yair,

The messages picked up from topic A create the queries. These messages are
partitioned and so are the real time messages from topic C.
If I persist the state . then how do I get the same partitioned data.
The partitioning of data is dynamic and based on worker nodes alive. Isnt
it ?
So even if we read the state, how to make sure its the same relative state.
as in realtime data may be now flowing to another bolt due to partitioning .

-Manoj



On Tue, Oct 21, 2014 at 2:34 PM, Yair Weinberger <ya...@gmail.com> wrote:

> Hi,
> It sounds like your bolt actually has a state (Initialized by the messages
> picked up from topic A)
> When restarting the bolt in case of failover, storm does not provide any
> inherent mechanism to keep the Bolt's previous state.
>
> In my opinion, your best option would be to move to Trident, which
> provides the notion of a state. see
> https://storm.apache.org/documentation/Trident-state.
>
> Alternatively, you can use any external storage (e.g. mongo or memcached)
> to save your state.
> After the processing of the messages from topic A you should write your
> state to the external storage.
> Then, you can read it in the prepare method. It would be empty in case the
> topology was just started, or have the data that was previously written
> there if it is a failover restart.
>
> Take a look at MongoBolt for some ideas (
> https://github.com/stormprocessor/storm-mongo/)
>
>   Yair
>   http://www.alooma.io
>
> On Mon, Oct 20, 2014 at 1:28 PM, Manoj Jaiswal <ma...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Let me explain my use case in brief:
>>
>> Kakfa spout A picks up messages from Kakfa topic topic-A and creates
>> Queries via Esper in Storm bolt B.
>> This is done only once as soon as topology is deployed
>> Another Kafka spout C picks up realtime messages from Kafka topic -C
>> which will be processed by Esper engine in same bolt B.
>>
>> The spout data from A and B are both partitioned by account numbers so
>> that the Esper engine in different worker processes gets same account
>> numbers.
>>
>> Now the problem:
>> In case of worker threads dying due to some issue or the supervisor node
>> getting kicked out of the cluster, we are observing that the bolt instance
>> may get assigned to new process/worker.
>> But the prepare method of the bolt is initializing Esper query
>> configuration. so every time the Esper query engine in that worker process
>> is initialized .Hence it loses the queries setup by one time messages from
>> Kafka spout A.
>>
>> Any suggestions, how do we handle this?
>>
>> -Manoj
>>
>
>

Re: storm rebalancing losing data

Posted by Yair Weinberger <ya...@gmail.com>.
Hi,
It sounds like your bolt actually has a state (Initialized by the messages
picked up from topic A)
When restarting the bolt in case of failover, storm does not provide any
inherent mechanism to keep the Bolt's previous state.

In my opinion, your best option would be to move to Trident, which provides
the notion of a state. see
https://storm.apache.org/documentation/Trident-state.

Alternatively, you can use any external storage (e.g. mongo or memcached)
to save your state.
After the processing of the messages from topic A you should write your
state to the external storage.
Then, you can read it in the prepare method. It would be empty in case the
topology was just started, or have the data that was previously written
there if it is a failover restart.

Take a look at MongoBolt for some ideas (
https://github.com/stormprocessor/storm-mongo/)

  Yair
  http://www.alooma.io

On Mon, Oct 20, 2014 at 1:28 PM, Manoj Jaiswal <ma...@gmail.com>
wrote:

> Hi,
>
> Let me explain my use case in brief:
>
> Kakfa spout A picks up messages from Kakfa topic topic-A and creates
> Queries via Esper in Storm bolt B.
> This is done only once as soon as topology is deployed
> Another Kafka spout C picks up realtime messages from Kafka topic -C which
> will be processed by Esper engine in same bolt B.
>
> The spout data from A and B are both partitioned by account numbers so
> that the Esper engine in different worker processes gets same account
> numbers.
>
> Now the problem:
> In case of worker threads dying due to some issue or the supervisor node
> getting kicked out of the cluster, we are observing that the bolt instance
> may get assigned to new process/worker.
> But the prepare method of the bolt is initializing Esper query
> configuration. so every time the Esper query engine in that worker process
> is initialized .Hence it loses the queries setup by one time messages from
> Kafka spout A.
>
> Any suggestions, how do we handle this?
>
> -Manoj
>