You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Spico Florin <sp...@gmail.com> on 2014/01/14 15:29:04 UTC

Rebalance and Distributed load on the workers

Hello!
>   I'm a newbie in Storm and I have some questions regarding the scaling
> the number of workers/executors among clusters and how data is correctly
> handled between them.
>    In the case of the WordCountTopology the WordCount Bolt is used to
> count the words from a text. From my observations and understanding, it is
> using an internal Map that is keeping the counts for the arrived words to
> each task.
>
> public static class WordCount extends BaseBasicBolt {
>     Map<String, Integer> counts = new HashMap<String, Integer>();
>     public void execute(Tuple tuple, BasicOutputCollector collector) {
>       String word = tuple.getString(0);
>       Integer count = counts.get(word);
>       if (count == null)
>         count = 0;
>       count++;
>       counts.put(word, count);
>     }
> }
>
>
>  From the configuration of the topology:
>
> builder.setBolt("count", new WordCount(), 6).fieldsGrouping("split", new
> Fields("word"));
>
> I can understand that each of the task will receive same words as it
> received first time or new ones,thus keeping the counts consistently (no
> task will receive words that were processed by a different task).
>
> Now suppose that:
> - you have a huge text with a small number of different words (let's say
> that you have 10MB of text containing only the words one, two, three,
> four,five,six,seven, eight, nine, ten).
> -you start the topology with 2 workers
> -at some moment in time (after all the words are distributed through the
> tasks and already have numbers), we are adding one more workers .
> Here are my questions:
> 1. When we re-balance our topology, will two newly added workers get data?
> 2.  If the two more workers will get data, how the words counts are kept
> consistently since they will receive some already processed words? Are the
> count values for the already processed words passed to the newly created
> workers?
> 3. Given the fact that I don't have remote cluster installed, are my
> assumption correct?
>
> I look forward for your answers and opinions.
>
> Thanks.
>
> Regards,
>   Florin
>
> Example.
> Given:
> Time T1:
> worker 1:
>    task1:  counts{(one,10),(two,21)}
>    task2:  counts{(three,5),(four,2)}
>    task3:  counts{(five,8)}
>
> worker 2:
>    task1:  counts{(six,10),(seven,21)}
>    task2:  counts{(eight,5),(nine,2)}
>    task3:  counts{(ten,8)}
>
> Time T10: (rebalancing 3 workers in place)
>
> worker 1:
>    task1:  counts{(one,10),(two,21)}
>      task2:  counts{(three,5),(four,2)}
> worker 2:
>    task1:  counts{(six,10),(seven,21)}
>    task2:  counts{(eight,5),(nine,2)}
>
> worker 3:
>      task1:  counts{(five,15 )}
>      task2:  counts{(ten,25)}
>
> For the worker 3 task 1: the value 15 should come from the previous value
> 8 (computed by w1/t3)  plus 7 new computed
>   worker 3 task 2: the value 25 should come from the previous value 8
> (computed by w2/t3)  plus 17 new computed
>
>
>
>
>
>
>
>
>
>
>
>
>
>

Re: Rebalance and Distributed load on the workers

Posted by Spico Florin <sp...@gmail.com>.
Hello!
  Thank you for your responses. In the documentation it is stated that when
a worker dies,  "the supervisor will restart it. If it continuously fails
on startup and is unable to heartbeat to Nimbus, Nimbus will reassign the
worker to another machine."
  1. What happens if the Nimbus cannot start a new worker in a another
machine (for example if all the machines are heavy loaded and there are no
physical respurces)?
  2. Suppose that our topolgy was started with 4 workers, one worker dies
an could not be restarted in any case (in the same machine or in a
different machine) then we will have only 3 workers left. In this case the
workload will be redistributed automatically or   do we have to rebalance?
I look forward for your answers.
  Thanks,
  Florin


On Wed, Jan 15, 2014 at 1:44 AM, Kyle Nusbaum <kn...@yahoo-inc.com>wrote:

>   Yes. The new workers will receive tuples.
>
>  It is up to you to keep track of your data. Workers may die and be
> brought back up at any time without too much fuss, so it's not a good idea
> to store critical information in them. Write it out somewhere else, but
> don't store it inside the bolt itself if you care about it being lost.
>
>  Part of rebalancing the topology involves killing existing workers.
>
>  So the short answer is that Storm doesn't do anything for you to try to
> keep your bolts consistent.
>
>   -- Kyle
>
>   From: Spico Florin <sp...@gmail.com>
> Reply-To: "user@storm.incubator.apache.org" <
> user@storm.incubator.apache.org>
> Date: Tuesday, January 14, 2014 8:29 AM
> To: "user@storm.incubator.apache.org" <us...@storm.incubator.apache.org>
> Subject: Rebalance and Distributed load on the workers
>
>
>
>
>
>         Hello!
>>    I'm a newbie in Storm and I have some questions regarding the scaling
>> the number of workers/executors among clusters and how data is correctly
>> handled between them.
>>     In the case of the WordCountTopology the WordCount Bolt is used to
>> count the words from a text. From my observations and understanding, it is
>> using an internal Map that is keeping the counts for the arrived words to
>> each task.
>>
>> public static class WordCount extends BaseBasicBolt {
>>     Map<String, Integer> counts = new HashMap<String, Integer>();
>>     public void execute(Tuple tuple, BasicOutputCollector collector) {
>>       String word = tuple.getString(0);
>>       Integer count = counts.get(word);
>>       if (count == null)
>>         count = 0;
>>       count++;
>>       counts.put(word, count);
>>     }
>> }
>>
>>
>>   From the configuration of the topology:
>>
>> builder.setBolt("count", new WordCount(), 6).fieldsGrouping("split", new
>> Fields("word"));
>>
>>  I can understand that each of the task will receive same words as it
>> received first time or new ones,thus keeping the counts consistently (no
>> task will receive words that were processed by a different task).
>>
>>  Now suppose that:
>> - you have a huge text with a small number of different words (let's say
>> that you have 10MB of text containing only the words one, two, three,
>> four,five,six,seven, eight, nine, ten).
>>  -you start the topology with 2 workers
>>  -at some moment in time (after all the words are distributed through
>> the tasks and already have numbers), we are adding one more workers .
>>  Here are my questions:
>>  1. When we re-balance our topology, will two newly added workers get
>> data?
>>  2.  If the two more workers will get data, how the words counts are kept
>> consistently since they will receive some already processed words? Are the
>> count values for the already processed words passed to the newly created
>> workers?
>>  3. Given the fact that I don't have remote cluster installed, are my
>> assumption correct?
>>
>>  I look forward for your answers and opinions.
>>
>>  Thanks.
>>
>>  Regards,
>>    Florin
>>
>>  Example.
>>  Given:
>>  Time T1:
>>  worker 1:
>>     task1:  counts{(one,10),(two,21)}
>>     task2:  counts{(three,5),(four,2)}
>>    task3:  counts{(five,8)}
>>
>> worker 2:
>>    task1:  counts{(six,10),(seven,21)}
>>    task2:  counts{(eight,5),(nine,2)}
>>    task3:  counts{(ten,8)}
>>
>> Time T10: (rebalancing 3 workers in place)
>>
>> worker 1:
>>    task1:  counts{(one,10),(two,21)}
>>      task2:  counts{(three,5),(four,2)}
>> worker 2:
>>    task1:  counts{(six,10),(seven,21)}
>>    task2:  counts{(eight,5),(nine,2)}
>>
>> worker 3:
>>      task1:  counts{(five,15 )}
>>      task2:  counts{(ten,25)}
>>
>>  For the worker 3 task 1: the value 15 should come from the previous
>> value 8 (computed by w1/t3)  plus 7 new computed
>>   worker 3 task 2: the value 25 should come from the previous value 8
>> (computed by w2/t3)  plus 17 new computed
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>
>

Re: Rebalance and Distributed load on the workers

Posted by Kyle Nusbaum <kn...@yahoo-inc.com>.
Yes. The new workers will receive tuples.

It is up to you to keep track of your data. Workers may die and be brought back up at any time without too much fuss, so it's not a good idea to store critical information in them. Write it out somewhere else, but don't store it inside the bolt itself if you care about it being lost.

Part of rebalancing the topology involves killing existing workers.

So the short answer is that Storm doesn't do anything for you to try to keep your bolts consistent.

-- Kyle

From: Spico Florin <sp...@gmail.com>>
Reply-To: "user@storm.incubator.apache.org<ma...@storm.incubator.apache.org>" <us...@storm.incubator.apache.org>>
Date: Tuesday, January 14, 2014 8:29 AM
To: "user@storm.incubator.apache.org<ma...@storm.incubator.apache.org>" <us...@storm.incubator.apache.org>>
Subject: Rebalance and Distributed load on the workers





Hello!
  I'm a newbie in Storm and I have some questions regarding the scaling the number of workers/executors among clusters and how data is correctly handled between them.
   In the case of the WordCountTopology the WordCount Bolt is used to count the words from a text. From my observations and understanding, it is using an internal Map that is keeping the counts for the arrived words to each task.

public static class WordCount extends BaseBasicBolt {
    Map<String, Integer> counts = new HashMap<String, Integer>();
    public void execute(Tuple tuple, BasicOutputCollector collector) {
      String word = tuple.getString(0);
      Integer count = counts.get(word);
      if (count == null)
        count = 0;
      count++;
      counts.put(word, count);
    }
}


 From the configuration of the topology:

builder.setBolt("count", new WordCount(), 6).fieldsGrouping("split", new Fields("word"));

I can understand that each of the task will receive same words as it received first time or new ones,thus keeping the counts consistently (no task will receive words that were processed by a different task).

Now suppose that:
- you have a huge text with a small number of different words (let's say that you have 10MB of text containing only the words one, two, three, four,five,six,seven, eight, nine, ten).
-you start the topology with 2 workers
-at some moment in time (after all the words are distributed through the tasks and already have numbers), we are adding one more workers .
Here are my questions:
1. When we re-balance our topology, will two newly added workers get data?
2.  If the two more workers will get data, how the words counts are kept consistently since they will receive some already processed words? Are the count values for the already processed words passed to the newly created workers?
3. Given the fact that I don't have remote cluster installed, are my assumption correct?

I look forward for your answers and opinions.

Thanks.

Regards,
  Florin

Example.
Given:
Time T1:
worker 1:
   task1:  counts{(one,10),(two,21)}
   task2:  counts{(three,5),(four,2)}
   task3:  counts{(five,8)}

worker 2:
   task1:  counts{(six,10),(seven,21)}
   task2:  counts{(eight,5),(nine,2)}
   task3:  counts{(ten,8)}

Time T10: (rebalancing 3 workers in place)

worker 1:
   task1:  counts{(one,10),(two,21)}
     task2:  counts{(three,5),(four,2)}
worker 2:
   task1:  counts{(six,10),(seven,21)}
   task2:  counts{(eight,5),(nine,2)}

worker 3:
     task1:  counts{(five,15 )}
     task2:  counts{(ten,25)}

For the worker 3 task 1: the value 15 should come from the previous value 8 (computed by w1/t3)  plus 7 new computed
  worker 3 task 2: the value 25 should come from the previous value 8 (computed by w2/t3)  plus 17 new computed