You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Spico Florin <sp...@gmail.com> on 2014/01/14 14:41:40 UTC

(Unknown)

Hello!
  I'm a newbie in Storm and I have some questions regarding the scaling the
number of workers/executors among clusters and how data is correctly
handled between them.
   In the case of the WordCountTopology the WordCount Bolt is used to count
the words from a text. From my observations and understanding, it is using
an internal Map that is keeping the counts for the arrived words to each
task.

public static class WordCount extends BaseBasicBolt {
    Map<String, Integer> counts = new HashMap<String, Integer>();
    public void execute(Tuple tuple, BasicOutputCollector collector) {
      String word = tuple.getString(0);
      Integer count = counts.get(word);
      if (count == null)
        count = 0;
      count++;
      counts.put(word, count);
    }
}


 From the configuration of the topology:

builder.setBolt("count", new WordCount(), 6).fieldsGrouping("split", new
Fields("word"));

I can understand that each of the task will receive same words as it
received first time or new ones,thus keeping the counts consistently (no
task will receive words that were processed by a different task).

Now suppose that:
- you have a huge text with a small number of different words (let's say
that you have 10MB of text containing only the words one, two, three,
four,five,six,seven, eight, nine, ten).
-you start the topology with 2 workers
-at some moment in time (after all the words are distributed through the
tasks and already have numbers), we are adding one more workers .
Here are my questions:
1. When we re-balance our topology, will two newly added workers get data?
2.  If the two more workers will get data, how the words counts are kept
consistently since they will receive some already processed words? Are the
count values for the already processed words passed to the newly created
workers?
3. Given the fact that I don't have remote cluster installed, are my
assumption correct?

I look forward for your answers and opinions.

Thanks.

Regards,
  Florin

Example.
Given:
Time T1:
worker 1:
   task1:  counts{(one,10),(two,21)}
   task2:  counts{(three,5),(four,2)}
   task3:  counts{(five,8)}

worker 2:
   task1:  counts{(six,10),(seven,21)}
   task2:  counts{(eight,5),(nine,2)}
   task3:  counts{(ten,8)}

Time T10: (rebalancing 3 workers in place)

worker 1:
   task1:  counts{(one,10),(two,21)}
     task2:  counts{(three,5),(four,2)}
worker 2:
   task1:  counts{(six,10),(seven,21)}
   task2:  counts{(eight,5),(nine,2)}

worker 3:
     task1:  counts{(five,15 )}
     task2:  counts{(ten,25)}

For the worker 3 task 1: the value 15 should come from the previous value 8
(computed by w1/t3)  plus 7 new computed
  worker 3 task 2: the value 25 should come from the previous value 8
(computed by w2/t3)  plus 17 new computed

Re:

Posted by Abhishek Bhattacharjee <ab...@gmail.com>.
It is a good question. I think it is related to groupings. You should study
different types of groupings to understand the problem fully.
Let me if this helps. You can refer this book
http://shop.oreilly.com/product/0636920024835.do


On Tue, Jan 14, 2014 at 5:41 AM, Spico Florin <sp...@gmail.com> wrote:

> Hello!
>   I'm a newbie in Storm and I have some questions regarding the scaling
> the number of workers/executors among clusters and how data is correctly
> handled between them.
>    In the case of the WordCountTopology the WordCount Bolt is used to
> count the words from a text. From my observations and understanding, it is
> using an internal Map that is keeping the counts for the arrived words to
> each task.
>
> public static class WordCount extends BaseBasicBolt {
>     Map<String, Integer> counts = new HashMap<String, Integer>();
>     public void execute(Tuple tuple, BasicOutputCollector collector) {
>       String word = tuple.getString(0);
>       Integer count = counts.get(word);
>       if (count == null)
>         count = 0;
>       count++;
>       counts.put(word, count);
>     }
> }
>
>
>  From the configuration of the topology:
>
> builder.setBolt("count", new WordCount(), 6).fieldsGrouping("split", new
> Fields("word"));
>
> I can understand that each of the task will receive same words as it
> received first time or new ones,thus keeping the counts consistently (no
> task will receive words that were processed by a different task).
>
> Now suppose that:
> - you have a huge text with a small number of different words (let's say
> that you have 10MB of text containing only the words one, two, three,
> four,five,six,seven, eight, nine, ten).
> -you start the topology with 2 workers
> -at some moment in time (after all the words are distributed through the
> tasks and already have numbers), we are adding one more workers .
> Here are my questions:
> 1. When we re-balance our topology, will two newly added workers get data?
> 2.  If the two more workers will get data, how the words counts are kept
> consistently since they will receive some already processed words? Are the
> count values for the already processed words passed to the newly created
> workers?
> 3. Given the fact that I don't have remote cluster installed, are my
> assumption correct?
>
> I look forward for your answers and opinions.
>
> Thanks.
>
> Regards,
>   Florin
>
> Example.
> Given:
> Time T1:
> worker 1:
>    task1:  counts{(one,10),(two,21)}
>    task2:  counts{(three,5),(four,2)}
>    task3:  counts{(five,8)}
>
> worker 2:
>    task1:  counts{(six,10),(seven,21)}
>    task2:  counts{(eight,5),(nine,2)}
>    task3:  counts{(ten,8)}
>
> Time T10: (rebalancing 3 workers in place)
>
> worker 1:
>    task1:  counts{(one,10),(two,21)}
>      task2:  counts{(three,5),(four,2)}
> worker 2:
>    task1:  counts{(six,10),(seven,21)}
>    task2:  counts{(eight,5),(nine,2)}
>
> worker 3:
>      task1:  counts{(five,15 )}
>      task2:  counts{(ten,25)}
>
> For the worker 3 task 1: the value 15 should come from the previous value
> 8 (computed by w1/t3)  plus 7 new computed
>   worker 3 task 2: the value 25 should come from the previous value 8
> (computed by w2/t3)  plus 17 new computed
>
>
>
>
>
>
>
>
>
>
>
>
>
>



-- 
*Abhishek Bhattacharjee*
*Pune Institute of Computer Technology*