You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Neil Carroll <ca...@hotmail.com> on 2014/04/08 20:42:23 UTC

Can Storm write an Aggregate Record to Postgres or SQL Server?

I'm new to Storm and want to use it to aggregate log data over a 5 minute period and write aggregate records (for each transaction type) into a DCMS (SQL or Postgres). I believe Storm can do this - and is there sample code available?
Thanks
 
Neil


  		 	   		  

RE: Can Storm write an Aggregate Record to Postgres or SQL Server?

Posted by Neil Carroll <ca...@hotmail.com>.
Many thanks! 
From: cody.a.ray@gmail.com
Date: Tue, 8 Apr 2014 15:05:48 -0500
Subject: Re: Can Storm write an Aggregate Record to Postgres or SQL Server?
To: user@storm.incubator.apache.org

Can you elaborate on how you want to "aggregate" data? If each log entry is essentially a timestamp, a transaction type (since you mentioned this), and a numerical value (which you cant to sum during the 5-minute window), then you don't need tick tuples.


The way we do aggregation is by mapping a timestamp into a bucket (e.g., your 5-minute window), grouping by the timestamp and transactionType, and using trident's persistentAggregate functionality to compute the sum.


Something like this:







    TridentTopology topology = new TridentTopology();
    Stream stream = topology.newStream("spout2", spout)
      .each(new Fields("bytes"), new BinaryToString(), new Fields("string"))


      .each(new Fields("string"), new LogParser(), new Fields("timestamp", "transactionType", "value"));


      .each(new Fields("timestamp"), new Bucket(entry.getValue()), new Fields("bucketStart", "bucketEnd"))


      .groupBy(new Fields("bucketStart", "transactionType"))


      .persistentAggregate(stateFactory, new Fields("value"), new Sum(), new Fields("count"))


    return topology.build();
Of course, you have to write the LogParser yourself since it depends on the format of the input messages. This example assumes a Kafka spout which is why it starts by parsing a "bytes" field. You can see the various helper functions here: https://gist.github.com/codyaray/9897217.


-Cody


On Tue, Apr 8, 2014 at 2:03 PM, Huang, Roger <ro...@visa.com> wrote:










Neil,
Take a look at using “tick tuples”
http://nathanmarz.github.io/storm/doc/backtype/storm/Config.html#TOPOLOGY_TICK_TUPLE_FREQ_SECS


and the Storm RDBMS bolt  https://github.com/nathanmarz/storm-contrib/tree/master/storm-rdbms 


-Roger
 


From: Neil Carroll [mailto:carroll_neil@hotmail.com]


Sent: Tuesday, April 08, 2014 1:42 PM

To: user@storm.incubator.apache.org

Subject: Can Storm write an Aggregate Record to Postgres or SQL Server?


 

I'm new to Storm and want to use it to aggregate log data over a 5 minute period and write aggregate records (for each transaction type) into a DCMS (SQL or Postgres).
 I believe Storm can do this - and is there sample code available?

Thanks

 

Neil





 







-- 
Cody A. Ray, LEED AP
cody.a.ray@gmail.com

215.501.7891

 		 	   		  

Re: Can Storm write an Aggregate Record to Postgres or SQL Server?

Posted by "Cody A. Ray" <co...@gmail.com>.
Can you elaborate on how you want to "aggregate" data? If each log entry is
essentially a timestamp, a transaction type (since you mentioned this), and
a numerical value (which you cant to sum during the 5-minute window), then
you don't need tick tuples.

The way we do aggregation is by mapping a timestamp into a bucket (e.g.,
your 5-minute window), grouping by the timestamp and transactionType, and
using trident's persistentAggregate functionality to compute the sum.

Something like this:

    TridentTopology topology = new TridentTopology();
    Stream stream = topology.newStream("spout2", spout)
      .each(new Fields("bytes"), new BinaryToString(), new Fields("string"))
      .each(new Fields("string"), new *LogParser*(), new Fields("timestamp",
"transactionType", "value"));
      .each(new Fields("timestamp"), new Bucket(entry.getValue()), newFields(
"bucketStart", "bucketEnd"))
      .groupBy(new Fields("bucketStart", "transactionType"))
      .persistentAggregate(stateFactory, new Fields("value"), new
Sum(), newFields(
"count"))
    return topology.build();
Of course, you have to write the LogParser yourself since it depends on the
format of the input messages. This example assumes a Kafka spout which is
why it starts by parsing a "bytes" field. You can see the various helper
functions here: https://gist.github.com/codyaray/9897217.

-Cody


On Tue, Apr 8, 2014 at 2:03 PM, Huang, Roger <ro...@visa.com> wrote:

>  Neil,
>
> Take a look at using "tick tuples"
>
>
> http://nathanmarz.github.io/storm/doc/backtype/storm/Config.html#TOPOLOGY_TICK_TUPLE_FREQ_SECS
>
> and the Storm RDBMS bolt  https://github.com/nathanmarz/storm-contrib/tree/master/storm-rdbms
>
> -Roger
>
>
>
> *From:* Neil Carroll [mailto:carroll_neil@hotmail.com]
> *Sent:* Tuesday, April 08, 2014 1:42 PM
> *To:* user@storm.incubator.apache.org
> *Subject:* Can Storm write an Aggregate Record to Postgres or SQL Server?
>
>
>
> I'm new to Storm and want to use it to aggregate log data over a 5 minute
> period and write aggregate records (for each transaction type) into a DCMS
> (SQL or Postgres). I believe Storm can do this - and is there sample code
> available?
> Thanks
>
> Neil
>
>
>



-- 
Cody A. Ray, LEED AP
cody.a.ray@gmail.com
215.501.7891

RE: Can Storm write an Aggregate Record to Postgres or SQL Server?

Posted by "Huang, Roger" <ro...@visa.com>.
Neil,
Take a look at using "tick tuples"
http://nathanmarz.github.io/storm/doc/backtype/storm/Config.html#TOPOLOGY_TICK_TUPLE_FREQ_SECS

and the Storm RDBMS bolt  https://github.com/nathanmarz/storm-contrib/tree/master/storm-rdbms
-Roger

From: Neil Carroll [mailto:carroll_neil@hotmail.com]
Sent: Tuesday, April 08, 2014 1:42 PM
To: user@storm.incubator.apache.org
Subject: Can Storm write an Aggregate Record to Postgres or SQL Server?

I'm new to Storm and want to use it to aggregate log data over a 5 minute period and write aggregate records (for each transaction type) into a DCMS (SQL or Postgres). I believe Storm can do this - and is there sample code available?
Thanks

Neil