You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Neil Carroll <ca...@hotmail.com> on 2014/04/08 20:42:23 UTC
Can Storm write an Aggregate Record to Postgres or SQL Server?
I'm new to Storm and want to use it to aggregate log data over a 5 minute period and write aggregate records (for each transaction type) into a DCMS (SQL or Postgres). I believe Storm can do this - and is there sample code available?
Thanks
Neil
RE: Can Storm write an Aggregate Record to Postgres or SQL Server?
Posted by Neil Carroll <ca...@hotmail.com>.
Many thanks!
From: cody.a.ray@gmail.com
Date: Tue, 8 Apr 2014 15:05:48 -0500
Subject: Re: Can Storm write an Aggregate Record to Postgres or SQL Server?
To: user@storm.incubator.apache.org
Can you elaborate on how you want to "aggregate" data? If each log entry is essentially a timestamp, a transaction type (since you mentioned this), and a numerical value (which you cant to sum during the 5-minute window), then you don't need tick tuples.
The way we do aggregation is by mapping a timestamp into a bucket (e.g., your 5-minute window), grouping by the timestamp and transactionType, and using trident's persistentAggregate functionality to compute the sum.
Something like this:
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout2", spout)
.each(new Fields("bytes"), new BinaryToString(), new Fields("string"))
.each(new Fields("string"), new LogParser(), new Fields("timestamp", "transactionType", "value"));
.each(new Fields("timestamp"), new Bucket(entry.getValue()), new Fields("bucketStart", "bucketEnd"))
.groupBy(new Fields("bucketStart", "transactionType"))
.persistentAggregate(stateFactory, new Fields("value"), new Sum(), new Fields("count"))
return topology.build();
Of course, you have to write the LogParser yourself since it depends on the format of the input messages. This example assumes a Kafka spout which is why it starts by parsing a "bytes" field. You can see the various helper functions here: https://gist.github.com/codyaray/9897217.
-Cody
On Tue, Apr 8, 2014 at 2:03 PM, Huang, Roger <ro...@visa.com> wrote:
Neil,
Take a look at using “tick tuples”
http://nathanmarz.github.io/storm/doc/backtype/storm/Config.html#TOPOLOGY_TICK_TUPLE_FREQ_SECS
and the Storm RDBMS bolt https://github.com/nathanmarz/storm-contrib/tree/master/storm-rdbms
-Roger
From: Neil Carroll [mailto:carroll_neil@hotmail.com]
Sent: Tuesday, April 08, 2014 1:42 PM
To: user@storm.incubator.apache.org
Subject: Can Storm write an Aggregate Record to Postgres or SQL Server?
I'm new to Storm and want to use it to aggregate log data over a 5 minute period and write aggregate records (for each transaction type) into a DCMS (SQL or Postgres).
I believe Storm can do this - and is there sample code available?
Thanks
Neil
--
Cody A. Ray, LEED AP
cody.a.ray@gmail.com
215.501.7891
Re: Can Storm write an Aggregate Record to Postgres or SQL Server?
Posted by "Cody A. Ray" <co...@gmail.com>.
Can you elaborate on how you want to "aggregate" data? If each log entry is
essentially a timestamp, a transaction type (since you mentioned this), and
a numerical value (which you cant to sum during the 5-minute window), then
you don't need tick tuples.
The way we do aggregation is by mapping a timestamp into a bucket (e.g.,
your 5-minute window), grouping by the timestamp and transactionType, and
using trident's persistentAggregate functionality to compute the sum.
Something like this:
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout2", spout)
.each(new Fields("bytes"), new BinaryToString(), new Fields("string"))
.each(new Fields("string"), new *LogParser*(), new Fields("timestamp",
"transactionType", "value"));
.each(new Fields("timestamp"), new Bucket(entry.getValue()), newFields(
"bucketStart", "bucketEnd"))
.groupBy(new Fields("bucketStart", "transactionType"))
.persistentAggregate(stateFactory, new Fields("value"), new
Sum(), newFields(
"count"))
return topology.build();
Of course, you have to write the LogParser yourself since it depends on the
format of the input messages. This example assumes a Kafka spout which is
why it starts by parsing a "bytes" field. You can see the various helper
functions here: https://gist.github.com/codyaray/9897217.
-Cody
On Tue, Apr 8, 2014 at 2:03 PM, Huang, Roger <ro...@visa.com> wrote:
> Neil,
>
> Take a look at using "tick tuples"
>
>
> http://nathanmarz.github.io/storm/doc/backtype/storm/Config.html#TOPOLOGY_TICK_TUPLE_FREQ_SECS
>
> and the Storm RDBMS bolt https://github.com/nathanmarz/storm-contrib/tree/master/storm-rdbms
>
> -Roger
>
>
>
> *From:* Neil Carroll [mailto:carroll_neil@hotmail.com]
> *Sent:* Tuesday, April 08, 2014 1:42 PM
> *To:* user@storm.incubator.apache.org
> *Subject:* Can Storm write an Aggregate Record to Postgres or SQL Server?
>
>
>
> I'm new to Storm and want to use it to aggregate log data over a 5 minute
> period and write aggregate records (for each transaction type) into a DCMS
> (SQL or Postgres). I believe Storm can do this - and is there sample code
> available?
> Thanks
>
> Neil
>
>
>
--
Cody A. Ray, LEED AP
cody.a.ray@gmail.com
215.501.7891
RE: Can Storm write an Aggregate Record to Postgres or SQL Server?
Posted by "Huang, Roger" <ro...@visa.com>.
Neil,
Take a look at using "tick tuples"
http://nathanmarz.github.io/storm/doc/backtype/storm/Config.html#TOPOLOGY_TICK_TUPLE_FREQ_SECS
and the Storm RDBMS bolt https://github.com/nathanmarz/storm-contrib/tree/master/storm-rdbms
-Roger
From: Neil Carroll [mailto:carroll_neil@hotmail.com]
Sent: Tuesday, April 08, 2014 1:42 PM
To: user@storm.incubator.apache.org
Subject: Can Storm write an Aggregate Record to Postgres or SQL Server?
I'm new to Storm and want to use it to aggregate log data over a 5 minute period and write aggregate records (for each transaction type) into a DCMS (SQL or Postgres). I believe Storm can do this - and is there sample code available?
Thanks
Neil