You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Yuval Oren <yu...@n3twork.com> on 2014/10/13 18:35:27 UTC
Trident persistentAggregate join?
Hi there,
I’m trying to denormalize a user’s join date in a trident topology. My input looks like this:
Field1 User ID Event Date
====== ======= ==========
a 1 10/1
b 1 10/2
c 2 10/3
And the output I want is:
Field1 User ID Event Date Join Date
====== ======= ========== =========
a 1 10/1 10/1
b 1 10/2 10/1
c 2 10/1 10/3
What is the right way to do this? If I use persistentAggregate and stateQuery, how can I be sure that the state is updated before the query happens. i.e.
TridentState joinDates = input.persistentAggregate(…);
Stream desiredOutput = input.stateQuery(joinDates,…);
--
Yuval Oren
N3TWORK
Re: Trident persistentAggregate join?
Posted by Yuval Oren <yu...@n3twork.com>.
I ended up creating an adapter of sorts for this situation. It might be convenient to have a Stream.statefulEach(Field keyFields, Field inputFields, Combiner/ReducerAggregator agg, Fields outputFields) method for this situation.
> On Oct 13, 2014, at 9:35 AM, Yuval Oren <yu...@n3twork.com> wrote:
>
> Hi there,
>
> I’m trying to denormalize a user’s join date in a trident topology. My input looks like this:
>
> Field1 User ID Event Date
> ====== ======= ==========
> a 1 10/1
> b 1 10/2
> c 2 10/3
>
> And the output I want is:
>
> Field1 User ID Event Date Join Date
> ====== ======= ========== =========
> a 1 10/1 10/1
> b 1 10/2 10/1
> c 2 10/1 10/3
>
>
> What is the right way to do this? If I use persistentAggregate and stateQuery, how can I be sure that the state is updated before the query happens. i.e.
>
> TridentState joinDates = input.persistentAggregate(…);
> Stream desiredOutput = input.stateQuery(joinDates,…);
>
>
> --
> Yuval Oren
> N3TWORK
>