You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Ashish Soni <as...@gmail.com> on 2015/05/29 17:09:06 UTC

Global Count - Trident - Please help

HI All ,

I am trying to run a global count using Trident and when i use Parallel
hint of 2 it is getting double counted , Please tell me what i am doing
wrong , below is the code and sample data set.

I am just trying to count the no of calls made by a particular phone no and
when i do not specify the parallel hint i get the count as 21 but when i
specify parallel of 2 it get count as 42


public static void main(String[] args) {
TridentTopology topology = new TridentTopology();
topology.newStream("cdrevent", new CSVSpout("testdata.csv", ',', false))
.parallelismHint(2).
groupBy(new Fields("field_1")).aggregate(new Fields("field_1"), new
Count(),new Fields("count")).
each(new Fields("field_1","count"), new Utils.PrintFilter());
 Config config = new Config();
config.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100);
 LocalCluster cluster = new LocalCluster();
 cluster.submitTopology("cdreventTopology", config, topology.build());
 backtype.storm.utils.Utils.sleep(10000);
cluster.killTopology("cdreventTopology");

}


0,05051111111,0,05055555555,22/01/2014,22/01/2014,21:15:00,21:20:00,Local,20.0
1,05051111111,0,05055555555,13/01/2014,13/01/2014,18:00:00,18:05:00,Local,5.0
2,05051111111,0,05055555555,21/01/2014,21/01/2014,20:35:00,20:35:00,Local,0.0
3,05051111111,0,05055555555,11/02/2014,11/02/2014,00:00:00,00:00:00,Local,0.0
4,05051111111,0,05055555555,22/02/2014,22/02/2014,20:25:00,20:25:00,Local,0.0
5,05051111111,1,38722222222,20/01/2014,20/01/2014,18:20:00,18:22:00,Intl,50.0
6,05051111111,0,05055555555,15/01/2014,15/01/2014,01:25:00,01:25:00,Local,0.0
7,05051111111,0,08453350000,31/12/2013,31/12/2013,12:40:00,12:44:00,National,32.0
8,05051111111,1,05055555555,30/12/2013,30/12/2013,18:40:00,18:40:00,Local,0.0
9,05051111111,0,08453350000,10/01/2014,10/01/2014,13:10:00,13:14:00,National,32.0
10,05051111111,0,05055555555,17/02/2014,17/02/2014,17:50:00,17:50:00,Local,0.0
11,05051111111,0,08453350000,09/02/2014,09/02/2014,18:15:00,18:17:00,National,8.0
12,05051111111,1,05055555555,09/02/2014,09/02/2014,17:05:00,17:05:00,Local,0.0
13,05051111111,1,05055555555,15/02/2014,15/02/2014,18:45:00,18:45:00,Local,0.0
14,05051111111,0,08453350000,20/02/2014,20/02/2014,19:20:00,19:21:00,National,8.0
15,05051111111,1,08453350000,05/01/2014,05/01/2014,13:50:00,13:53:00,National,12.0
16,05051111111,1,07776122222,26/01/2014,26/01/2014,14:00:00,14:00:00,Mobile,0.0
17,05051111111,0,05055555555,04/02/2014,04/02/2014,12:30:00,12:35:00,Local,20.0
18,05051111111,1,05055555555,16/01/2014,16/01/2014,20:20:00,20:25:00,Local,20.0
19,05051111111,0,05055555555,23/02/2014,23/02/2014,16:55:00,17:07:00,Local,12.0
20,05051111111,0,05055555555,07/01/2014,07/01/2014,16:20:00,16:23:00,Local,12.0

Ashish

Re: Global Count - Trident - Please help

Posted by Ashish Soni <as...@gmail.com>.
Can you point me to the example , I am not able to understand what you mean
by partition the data accordingly.

If i have 3 node storm cluster and i want to go a global count how it will
work , please explain if possible.

Regards

On Fri, May 29, 2015 at 12:04 PM, Andrew Xor <an...@gmail.com>
wrote:

> I am guessing that as you currently do it you spawn the different tasks
> counting the same thing, hence you are basically reading the data twice. I
> suspect if you set a parallelism hint of 3,4 and so on you would get 21 x
> that number. To do a global count you need to partition the data
> accordingly, please do check the documentation as it has some useful
> examples as to how to accomplish that.
>
> Regards.
>
> On Fri, May 29, 2015 at 6:56 PM P. Taylor Goetz <pt...@gmail.com> wrote:
>
>> Try moving “.parallelismHint(2)” to after the groupBy.
>>
>> With the current placement (before the groupBy) Storm is creating two
>> instances of your spout, each outputting the same data set.
>>
>> -Taylor
>>
>>
>> On May 29, 2015, at 11:09 AM, Ashish Soni <as...@gmail.com> wrote:
>>
>> > HI All ,
>> >
>> > I am trying to run a global count using Trident and when i use Parallel
>> hint of 2 it is getting double counted , Please tell me what i am doing
>> wrong , below is the code and sample data set.
>> >
>> > I am just trying to count the no of calls made by a particular phone no
>> and when i do not specify the parallel hint i get the count as 21 but when
>> i specify parallel of 2 it get count as 42
>> >
>> >
>> > public static void main(String[] args) {
>> >               TridentTopology topology = new TridentTopology();
>> >               topology.newStream("cdrevent", new
>> CSVSpout("testdata.csv", ',', false))
>> >               .parallelismHint(2).
>> >               groupBy(new Fields("field_1")).aggregate(new
>> Fields("field_1"), new Count(),new Fields("count")).
>> >               each(new Fields("field_1","count"), new
>> Utils.PrintFilter());
>> >
>> >               Config config = new Config();
>> >               config.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF,
>> 100);
>> >
>> >
>> >               LocalCluster cluster = new LocalCluster();
>> >
>> >               cluster.submitTopology("cdreventTopology", config,
>> topology.build());
>> >
>> >               backtype.storm.utils.Utils.sleep(10000);
>> >               cluster.killTopology("cdreventTopology");
>> >
>> >       }
>> >
>> >
>> >
>> 0,05051111111,0,05055555555,22/01/2014,22/01/2014,21:15:00,21:20:00,Local,20.0
>> >
>> 1,05051111111,0,05055555555,13/01/2014,13/01/2014,18:00:00,18:05:00,Local,5.0
>> >
>> 2,05051111111,0,05055555555,21/01/2014,21/01/2014,20:35:00,20:35:00,Local,0.0
>> >
>> 3,05051111111,0,05055555555,11/02/2014,11/02/2014,00:00:00,00:00:00,Local,0.0
>> >
>> 4,05051111111,0,05055555555,22/02/2014,22/02/2014,20:25:00,20:25:00,Local,0.0
>> >
>> 5,05051111111,1,38722222222,20/01/2014,20/01/2014,18:20:00,18:22:00,Intl,50.0
>> >
>> 6,05051111111,0,05055555555,15/01/2014,15/01/2014,01:25:00,01:25:00,Local,0.0
>> >
>> 7,05051111111,0,08453350000,31/12/2013,31/12/2013,12:40:00,12:44:00,National,32.0
>> >
>> 8,05051111111,1,05055555555,30/12/2013,30/12/2013,18:40:00,18:40:00,Local,0.0
>> >
>> 9,05051111111,0,08453350000,10/01/2014,10/01/2014,13:10:00,13:14:00,National,32.0
>> >
>> 10,05051111111,0,05055555555,17/02/2014,17/02/2014,17:50:00,17:50:00,Local,0.0
>> >
>> 11,05051111111,0,08453350000,09/02/2014,09/02/2014,18:15:00,18:17:00,National,8.0
>> >
>> 12,05051111111,1,05055555555,09/02/2014,09/02/2014,17:05:00,17:05:00,Local,0.0
>> >
>> 13,05051111111,1,05055555555,15/02/2014,15/02/2014,18:45:00,18:45:00,Local,0.0
>> >
>> 14,05051111111,0,08453350000,20/02/2014,20/02/2014,19:20:00,19:21:00,National,8.0
>> >
>> 15,05051111111,1,08453350000,05/01/2014,05/01/2014,13:50:00,13:53:00,National,12.0
>> >
>> 16,05051111111,1,07776122222,26/01/2014,26/01/2014,14:00:00,14:00:00,Mobile,0.0
>> >
>> 17,05051111111,0,05055555555,04/02/2014,04/02/2014,12:30:00,12:35:00,Local,20.0
>> >
>> 18,05051111111,1,05055555555,16/01/2014,16/01/2014,20:20:00,20:25:00,Local,20.0
>> >
>> 19,05051111111,0,05055555555,23/02/2014,23/02/2014,16:55:00,17:07:00,Local,12.0
>> >
>> 20,05051111111,0,05055555555,07/01/2014,07/01/2014,16:20:00,16:23:00,Local,12.0
>> >
>> > Ashish
>> >
>> >
>>
>>

Re: Global Count - Trident - Please help

Posted by Andrew Xor <an...@gmail.com>.
I am guessing that as you currently do it you spawn the different tasks
counting the same thing, hence you are basically reading the data twice. I
suspect if you set a parallelism hint of 3,4 and so on you would get 21 x
that number. To do a global count you need to partition the data
accordingly, please do check the documentation as it has some useful
examples as to how to accomplish that.

Regards.
On Fri, May 29, 2015 at 6:56 PM P. Taylor Goetz <pt...@gmail.com> wrote:

> Try moving “.parallelismHint(2)” to after the groupBy.
>
> With the current placement (before the groupBy) Storm is creating two
> instances of your spout, each outputting the same data set.
>
> -Taylor
>
>
> On May 29, 2015, at 11:09 AM, Ashish Soni <as...@gmail.com> wrote:
>
> > HI All ,
> >
> > I am trying to run a global count using Trident and when i use Parallel
> hint of 2 it is getting double counted , Please tell me what i am doing
> wrong , below is the code and sample data set.
> >
> > I am just trying to count the no of calls made by a particular phone no
> and when i do not specify the parallel hint i get the count as 21 but when
> i specify parallel of 2 it get count as 42
> >
> >
> > public static void main(String[] args) {
> >               TridentTopology topology = new TridentTopology();
> >               topology.newStream("cdrevent", new
> CSVSpout("testdata.csv", ',', false))
> >               .parallelismHint(2).
> >               groupBy(new Fields("field_1")).aggregate(new
> Fields("field_1"), new Count(),new Fields("count")).
> >               each(new Fields("field_1","count"), new
> Utils.PrintFilter());
> >
> >               Config config = new Config();
> >               config.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF,
> 100);
> >
> >
> >               LocalCluster cluster = new LocalCluster();
> >
> >               cluster.submitTopology("cdreventTopology", config,
> topology.build());
> >
> >               backtype.storm.utils.Utils.sleep(10000);
> >               cluster.killTopology("cdreventTopology");
> >
> >       }
> >
> >
> >
> 0,05051111111,0,05055555555,22/01/2014,22/01/2014,21:15:00,21:20:00,Local,20.0
> >
> 1,05051111111,0,05055555555,13/01/2014,13/01/2014,18:00:00,18:05:00,Local,5.0
> >
> 2,05051111111,0,05055555555,21/01/2014,21/01/2014,20:35:00,20:35:00,Local,0.0
> >
> 3,05051111111,0,05055555555,11/02/2014,11/02/2014,00:00:00,00:00:00,Local,0.0
> >
> 4,05051111111,0,05055555555,22/02/2014,22/02/2014,20:25:00,20:25:00,Local,0.0
> >
> 5,05051111111,1,38722222222,20/01/2014,20/01/2014,18:20:00,18:22:00,Intl,50.0
> >
> 6,05051111111,0,05055555555,15/01/2014,15/01/2014,01:25:00,01:25:00,Local,0.0
> >
> 7,05051111111,0,08453350000,31/12/2013,31/12/2013,12:40:00,12:44:00,National,32.0
> >
> 8,05051111111,1,05055555555,30/12/2013,30/12/2013,18:40:00,18:40:00,Local,0.0
> >
> 9,05051111111,0,08453350000,10/01/2014,10/01/2014,13:10:00,13:14:00,National,32.0
> >
> 10,05051111111,0,05055555555,17/02/2014,17/02/2014,17:50:00,17:50:00,Local,0.0
> >
> 11,05051111111,0,08453350000,09/02/2014,09/02/2014,18:15:00,18:17:00,National,8.0
> >
> 12,05051111111,1,05055555555,09/02/2014,09/02/2014,17:05:00,17:05:00,Local,0.0
> >
> 13,05051111111,1,05055555555,15/02/2014,15/02/2014,18:45:00,18:45:00,Local,0.0
> >
> 14,05051111111,0,08453350000,20/02/2014,20/02/2014,19:20:00,19:21:00,National,8.0
> >
> 15,05051111111,1,08453350000,05/01/2014,05/01/2014,13:50:00,13:53:00,National,12.0
> >
> 16,05051111111,1,07776122222,26/01/2014,26/01/2014,14:00:00,14:00:00,Mobile,0.0
> >
> 17,05051111111,0,05055555555,04/02/2014,04/02/2014,12:30:00,12:35:00,Local,20.0
> >
> 18,05051111111,1,05055555555,16/01/2014,16/01/2014,20:20:00,20:25:00,Local,20.0
> >
> 19,05051111111,0,05055555555,23/02/2014,23/02/2014,16:55:00,17:07:00,Local,12.0
> >
> 20,05051111111,0,05055555555,07/01/2014,07/01/2014,16:20:00,16:23:00,Local,12.0
> >
> > Ashish
> >
> >
>
>

Re: Global Count - Trident - Please help

Posted by "P. Taylor Goetz" <pt...@gmail.com>.
Try moving “.parallelismHint(2)” to after the groupBy.

With the current placement (before the groupBy) Storm is creating two instances of your spout, each outputting the same data set.

-Taylor


On May 29, 2015, at 11:09 AM, Ashish Soni <as...@gmail.com> wrote:

> HI All , 
> 
> I am trying to run a global count using Trident and when i use Parallel hint of 2 it is getting double counted , Please tell me what i am doing wrong , below is the code and sample data set.
> 
> I am just trying to count the no of calls made by a particular phone no and when i do not specify the parallel hint i get the count as 21 but when i specify parallel of 2 it get count as 42
> 
> 
> public static void main(String[] args) {
> 		TridentTopology topology = new TridentTopology();
> 		topology.newStream("cdrevent", new CSVSpout("testdata.csv", ',', false))
> 		.parallelismHint(2).
> 		groupBy(new Fields("field_1")).aggregate(new Fields("field_1"), new Count(),new Fields("count")).
> 		each(new Fields("field_1","count"), new Utils.PrintFilter());
> 		
> 		Config config = new Config();
> 		config.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100);
> 		
> 		
> 		LocalCluster cluster = new LocalCluster();
> 		
> 		cluster.submitTopology("cdreventTopology", config, topology.build());
> 		
> 		backtype.storm.utils.Utils.sleep(10000);
> 		cluster.killTopology("cdreventTopology");
> 
> 	}
> 
> 
> 0,05051111111,0,05055555555,22/01/2014,22/01/2014,21:15:00,21:20:00,Local,20.0
> 1,05051111111,0,05055555555,13/01/2014,13/01/2014,18:00:00,18:05:00,Local,5.0
> 2,05051111111,0,05055555555,21/01/2014,21/01/2014,20:35:00,20:35:00,Local,0.0
> 3,05051111111,0,05055555555,11/02/2014,11/02/2014,00:00:00,00:00:00,Local,0.0
> 4,05051111111,0,05055555555,22/02/2014,22/02/2014,20:25:00,20:25:00,Local,0.0
> 5,05051111111,1,38722222222,20/01/2014,20/01/2014,18:20:00,18:22:00,Intl,50.0
> 6,05051111111,0,05055555555,15/01/2014,15/01/2014,01:25:00,01:25:00,Local,0.0
> 7,05051111111,0,08453350000,31/12/2013,31/12/2013,12:40:00,12:44:00,National,32.0
> 8,05051111111,1,05055555555,30/12/2013,30/12/2013,18:40:00,18:40:00,Local,0.0
> 9,05051111111,0,08453350000,10/01/2014,10/01/2014,13:10:00,13:14:00,National,32.0
> 10,05051111111,0,05055555555,17/02/2014,17/02/2014,17:50:00,17:50:00,Local,0.0
> 11,05051111111,0,08453350000,09/02/2014,09/02/2014,18:15:00,18:17:00,National,8.0
> 12,05051111111,1,05055555555,09/02/2014,09/02/2014,17:05:00,17:05:00,Local,0.0
> 13,05051111111,1,05055555555,15/02/2014,15/02/2014,18:45:00,18:45:00,Local,0.0
> 14,05051111111,0,08453350000,20/02/2014,20/02/2014,19:20:00,19:21:00,National,8.0
> 15,05051111111,1,08453350000,05/01/2014,05/01/2014,13:50:00,13:53:00,National,12.0
> 16,05051111111,1,07776122222,26/01/2014,26/01/2014,14:00:00,14:00:00,Mobile,0.0
> 17,05051111111,0,05055555555,04/02/2014,04/02/2014,12:30:00,12:35:00,Local,20.0
> 18,05051111111,1,05055555555,16/01/2014,16/01/2014,20:20:00,20:25:00,Local,20.0
> 19,05051111111,0,05055555555,23/02/2014,23/02/2014,16:55:00,17:07:00,Local,12.0
> 20,05051111111,0,05055555555,07/01/2014,07/01/2014,16:20:00,16:23:00,Local,12.0
> 
> Ashish
>  
>