You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Larry Palmer <la...@gmail.com> on 2014/05/02 16:57:59 UTC

How to do delimit persistent aggregate output

I'm kind of new to Trident, and was looking for some help making
persistentAggregates group values over a defined set of tuples.

Groupby() followed by a regular (combiner) aggregate accumulates values for
the duration of a batch, then outputs them all onto an output stream, which
is very convenient.

I'd like to make a persistentAggregate behave similarly, except that it
aggregates across batches, then outputs all its aggregates onto an output
stream when some external signal occurs, and then clears its database to
begin the next aggregation period.

One approach would be to have the spout put a "Done" flag in each tuple,
e.g. emit tuples like ("DIMENSION_1", "DIMENSION_2",
"METRIC_TO_BE_AGGREGATED", "DONE"), where it's doing .groupby(new
Fields("DIMENSION_1", "DIMENSION_2")), and the final tuple of each
aggregation period has the Done flag set, or maybe better, a final batch
would be sent consisting of a single tuple with null values for all the
other columns and the Done flag set to indicate all previous batches are
complete.

The problem with that is I'm not sure how to get the tuple with the "Done"
flag to go to all the partitions of the persistentAggregate - is there any
way to broadcast a specific tuple to all partitions?

Another approach would be to split another stream off the spout, that
watches for the "Done" flag and generates a state query when it sees it.
But it doesn't appear to be possible for a single state query to generate
more than a single tuple of output - it appears to want to be given a list
of keys to query for, and return the tuples for those keys - I don't see a
way for a state query to dump the whole DB.

Is there any way to do what I'm trying to do?

Re: How to do delimit persistent aggregate output

Posted by Larry Palmer <la...@gmail.com>.
Let me try again, since I feel like what I'm trying to do is exactly the
sort of application Storm/Trident is designed for, yet I can't find a
natural way to implement it.

The goal is very simple: data streams in from a spout (probably a
partitioned spout), in 5-minute (based on a timestamp in the data) bursts.
The spout knows when each burst has been completed. I expect every burst to
take 2-3 minutes to process. Each burst is maybe 50 million tuples, so far
more than one batch. I'd like the data to flow from the spout, be grouped
and aggregated by various combinations of fields, then have the 5-minute
aggregates flow through to a set of threshold filters which can compare the
values of each 5-minute aggregate to a threshold and generate alarms as
appropriate.

Of course I've read the section on Time Series in the Storm FAQ (
https://github.com/nathanmarz/storm/wiki/FAQ). I'm not trying to write the
5-minute aggregates to an external database that can be queried by some
external process; I'd like them to flow through Trident and do the
threshold/alarm processing from within it.


On Fri, May 2, 2014 at 10:57 AM, Larry Palmer <la...@gmail.com>wrote:

> I'm kind of new to Trident, and was looking for some help making
> persistentAggregates group values over a defined set of tuples.
>
> Groupby() followed by a regular (combiner) aggregate accumulates values
> for the duration of a batch, then outputs them all onto an output stream,
> which is very convenient.
>
> I'd like to make a persistentAggregate behave similarly, except that it
> aggregates across batches, then outputs all its aggregates onto an output
> stream when some external signal occurs, and then clears its database to
> begin the next aggregation period.
>
> One approach would be to have the spout put a "Done" flag in each tuple,
> e.g. emit tuples like ("DIMENSION_1", "DIMENSION_2",
> "METRIC_TO_BE_AGGREGATED", "DONE"), where it's doing .groupby(new
> Fields("DIMENSION_1", "DIMENSION_2")), and the final tuple of each
> aggregation period has the Done flag set, or maybe better, a final batch
> would be sent consisting of a single tuple with null values for all the
> other columns and the Done flag set to indicate all previous batches are
> complete.
>
> The problem with that is I'm not sure how to get the tuple with the "Done"
> flag to go to all the partitions of the persistentAggregate - is there any
> way to broadcast a specific tuple to all partitions?
>
> Another approach would be to split another stream off the spout, that
> watches for the "Done" flag and generates a state query when it sees it.
> But it doesn't appear to be possible for a single state query to generate
> more than a single tuple of output - it appears to want to be given a list
> of keys to query for, and return the tuples for those keys - I don't see a
> way for a state query to dump the whole DB.
>
> Is there any way to do what I'm trying to do?
>
>
>