You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Raphael Hsieh <ra...@gmail.com> on 2014/04/21 21:57:41 UTC

PersistentAggregate across batches

If I am using an opaque spout and doing a persistent aggregate to a
MemcachedState, how is it aggregating/incrementing the values across all
batches ?

I'm wanting to implement an IBackingMap so that I can use an external
datastore. However, I'm unsure where the logic goes that will read the
previous data, and aggregate it with the new data.

>From what I've been told, I need to implement the IBackingMap and the
multiput/multiget functions. So logically, I think it makes sense that I
would put this update logiv in the multiput function. However, the
OpaqueMap class already has multiGet logic in order to check the TxId of
the batch.
Instead of using an OpaqueMap class, should I just make my own
implementation ?

Thanks
-- 
Raphael Hsieh

Re: PersistentAggregate across batches

Posted by Raphael Hsieh <ra...@gmail.com>.
Are you saying that there is no purpose to do a "groupBy" followed by a
PersistentAggregate ? The documentation states: "If you run aggregators on
a grouped stream, the aggregation will be run within each group instead of
against the whole batch."


On Wed, Apr 23, 2014 at 2:17 AM, Danijel Schiavuzzi
<da...@schiavuzzi.com>wrote:

>
> When I do something like
>>
>> Stream
>>     .groupBy(new Fields("a")
>>     .persistentAggregate(new MyStateFactory(), new Fields("a", "b", "c",
>> "d"), new MyAggregator(), new Fields("resultMap"))
>>
>> What happens (as described here<https://github.com/nathanmarz/storm/wiki/Trident-API-Overview>)
>> is the stream is split into different groups based on field "a":
>>
>
> This is not true. The stream will be grouped based on all the keys you
> specified in persistentAggregate, i.e. new Fields("a", "b", "c", "d"). This
> will produce as many GroupedStreams as there are distinct groupings among
> those keys. Those groupings will then be combined/reduced with the existing
> values gathered from the IBackingMap#multiGet(), and Trident will then call
> multiPut()) to persist the updated aggregations back to the underlying data
> store.
>
> Take a look at the Storm sources under the package "storm.trident.*". A
> good starting point for understanding Trident would be the Java class
> "storm.trident.state.map.TransactionalMap" (or OpaqueMap or
> NonTransactionalMap).
>
> Danijel Schiavuzzi
> www.schiavuzzi.com
>
>
>
>
>
>> [image: Grouping]
>> like so.
>> then, PartitionPersist will run a MultiGet on the fields ("a", "b", "c",
>> "d"), since that is what we are using as our keys. So in each of the
>> "groups" described above, we would have not only the raw tuples resulting
>> from the grouping, but also a single tuple with the result of the previous
>> aggregation.
>> These would all be run through the aggregator, which should be able to
>> handle aggregating with this semi-complete aggregation (The "Reduce"
>> function in a ReducerAggregator, or the "Combine" function in the
>> CombinerAggregator).
>>
>> How does it know not to treat the previous aggregation as a single new
>> tuple? (hence not running the "init" function ? For example if I was
>> aggregating a count, having that previous value (say 60) as a single extra
>> tuple would only increment the count by 1, instead of 60.
>> would I then just need to implement my own "init" function such that it
>> has checks for the tuple  value, whether it is a raw new tuple, vs a
>> previous tuple aggregation?
>>
>>
>> On Tue, Apr 22, 2014 at 9:59 AM, Cody A. Ray <co...@gmail.com>wrote:
>>
>>> My understanding is that the process is
>>> 1. multiGet from the IBackingMap  is called and returns a value for each
>>> key (or null if not present)
>>> 2. For each key, the old value from the get and new values in the batch
>>> are fed through the aggregator to produce one value per key
>>> 3. This value is then stored back into the state through the multiPut in
>>> the IBackingMap.
>>>
>>> If you just want to use nathanmarz's trident-memcached integration, you
>>> don't have to write an IBackingMap yourself. The MemcachedState itself
>>> implements IBackingMap to do the get and put. To use it, just decide what
>>> you want to groupBy (these become your keys) and how you want it aggregated
>>> (this is the reduced/combiner implementation). You don't have to write the
>>> memcache connection logic or the aggregation logic yourself unless you want
>>> to change how it's aggregated or stored.
>>> I've not used the trident-memcached state in particular, but in general
>>> this would look something like this:
>>>
>>> topology.newStream("spout1", spout1)
>>>   .groupBy(new Fields("mykeyfield"))
>>>   .persistentAggregate(MemcachedState.opaque(servers), new
>>> Fields("myvaluefield"), new Sum(), new Fields("sum"))
>>>
>>> (Sorry for any code errors; writing in my phone)
>>>
>>> Does that answer your question?
>>>
>>> -Cody
>>> On Apr 22, 2014 10:32 AM, "Raphael Hsieh" <ra...@gmail.com> wrote:
>>>
>>>> The Reducer/Combiner Aggregators hold logic in order to aggregate
>>>> across an entire batch, however it does not have the logic to aggregate
>>>> between batches.
>>>> In order for this to happen, it must read the previous TransactionId
>>>> and value from the datastore, determine whether this incoming data is in
>>>> the right sequence, then increment the value within the datastore.
>>>>
>>>> I am asking about this second part. Where the logic goes in order to
>>>> read previous data from the datastore, and add it to the new incoming
>>>> aggregate data.
>>>>
>>>>
>>>> On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray <co...@gmail.com>wrote:
>>>>
>>>>> Its the ReducerAggregate/CombinerAggregator's job to implement this
>>>>> logic. Look at Count and Sum that are built-in to Trident. You can also
>>>>> implement your own aggregator.
>>>>>
>>>>> -Cody
>>>>>
>>>>>
>>>>> On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh <ra...@gmail.com>wrote:
>>>>>
>>>>>> If I am using an opaque spout and doing a persistent aggregate to a
>>>>>> MemcachedState, how is it aggregating/incrementing the values across all
>>>>>> batches ?
>>>>>>
>>>>>> I'm wanting to implement an IBackingMap so that I can use an external
>>>>>> datastore. However, I'm unsure where the logic goes that will read the
>>>>>> previous data, and aggregate it with the new data.
>>>>>>
>>>>>> From what I've been told, I need to implement the IBackingMap and the
>>>>>> multiput/multiget functions. So logically, I think it makes sense that I
>>>>>> would put this update logiv in the multiput function. However, the
>>>>>> OpaqueMap class already has multiGet logic in order to check the TxId of
>>>>>> the batch.
>>>>>> Instead of using an OpaqueMap class, should I just make my own
>>>>>> implementation ?
>>>>>>
>>>>>> Thanks
>>>>>> --
>>>>>> Raphael Hsieh
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Cody A. Ray, LEED AP
>>>>> cody.a.ray@gmail.com
>>>>> 215.501.7891
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Raphael Hsieh
>>>>
>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>
>
>
> --
> Danijel Schiavuzzi
>
> E: danijel@schiavuzzi.com
> W: www.schiavuzzi.com
> T: +385989035562
> Skype: danijels7
>



-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014

Re: PersistentAggregate across batches

Posted by Danijel Schiavuzzi <da...@schiavuzzi.com>.
> When I do something like
>
> Stream
>     .groupBy(new Fields("a")
>     .persistentAggregate(new MyStateFactory(), new Fields("a", "b", "c",
> "d"), new MyAggregator(), new Fields("resultMap"))
>
> What happens (as described here<https://github.com/nathanmarz/storm/wiki/Trident-API-Overview>)
> is the stream is split into different groups based on field "a":
>

This is not true. The stream will be grouped based on all the keys you
specified in persistentAggregate, i.e. new Fields("a", "b", "c", "d"). This
will produce as many GroupedStreams as there are distinct groupings among
those keys. Those groupings will then be combined/reduced with the existing
values gathered from the IBackingMap#multiGet(), and Trident will then call
multiPut()) to persist the updated aggregations back to the underlying data
store.

Take a look at the Storm sources under the package "storm.trident.*". A
good starting point for understanding Trident would be the Java class
"storm.trident.state.map.TransactionalMap" (or OpaqueMap or
NonTransactionalMap).

Danijel Schiavuzzi
www.schiavuzzi.com





> [image: Grouping]
> like so.
> then, PartitionPersist will run a MultiGet on the fields ("a", "b", "c",
> "d"), since that is what we are using as our keys. So in each of the
> "groups" described above, we would have not only the raw tuples resulting
> from the grouping, but also a single tuple with the result of the previous
> aggregation.
> These would all be run through the aggregator, which should be able to
> handle aggregating with this semi-complete aggregation (The "Reduce"
> function in a ReducerAggregator, or the "Combine" function in the
> CombinerAggregator).
>
> How does it know not to treat the previous aggregation as a single new
> tuple? (hence not running the "init" function ? For example if I was
> aggregating a count, having that previous value (say 60) as a single extra
> tuple would only increment the count by 1, instead of 60.
> would I then just need to implement my own "init" function such that it
> has checks for the tuple  value, whether it is a raw new tuple, vs a
> previous tuple aggregation?
>
>
> On Tue, Apr 22, 2014 at 9:59 AM, Cody A. Ray <co...@gmail.com> wrote:
>
>> My understanding is that the process is
>> 1. multiGet from the IBackingMap  is called and returns a value for each
>> key (or null if not present)
>> 2. For each key, the old value from the get and new values in the batch
>> are fed through the aggregator to produce one value per key
>> 3. This value is then stored back into the state through the multiPut in
>> the IBackingMap.
>>
>> If you just want to use nathanmarz's trident-memcached integration, you
>> don't have to write an IBackingMap yourself. The MemcachedState itself
>> implements IBackingMap to do the get and put. To use it, just decide what
>> you want to groupBy (these become your keys) and how you want it aggregated
>> (this is the reduced/combiner implementation). You don't have to write the
>> memcache connection logic or the aggregation logic yourself unless you want
>> to change how it's aggregated or stored.
>> I've not used the trident-memcached state in particular, but in general
>> this would look something like this:
>>
>> topology.newStream("spout1", spout1)
>>   .groupBy(new Fields("mykeyfield"))
>>   .persistentAggregate(MemcachedState.opaque(servers), new
>> Fields("myvaluefield"), new Sum(), new Fields("sum"))
>>
>> (Sorry for any code errors; writing in my phone)
>>
>> Does that answer your question?
>>
>> -Cody
>> On Apr 22, 2014 10:32 AM, "Raphael Hsieh" <ra...@gmail.com> wrote:
>>
>>> The Reducer/Combiner Aggregators hold logic in order to aggregate across
>>> an entire batch, however it does not have the logic to aggregate between
>>> batches.
>>> In order for this to happen, it must read the previous TransactionId and
>>> value from the datastore, determine whether this incoming data is in the
>>> right sequence, then increment the value within the datastore.
>>>
>>> I am asking about this second part. Where the logic goes in order to
>>> read previous data from the datastore, and add it to the new incoming
>>> aggregate data.
>>>
>>>
>>> On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray <co...@gmail.com>wrote:
>>>
>>>> Its the ReducerAggregate/CombinerAggregator's job to implement this
>>>> logic. Look at Count and Sum that are built-in to Trident. You can also
>>>> implement your own aggregator.
>>>>
>>>> -Cody
>>>>
>>>>
>>>> On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh <ra...@gmail.com>wrote:
>>>>
>>>>> If I am using an opaque spout and doing a persistent aggregate to a
>>>>> MemcachedState, how is it aggregating/incrementing the values across all
>>>>> batches ?
>>>>>
>>>>> I'm wanting to implement an IBackingMap so that I can use an external
>>>>> datastore. However, I'm unsure where the logic goes that will read the
>>>>> previous data, and aggregate it with the new data.
>>>>>
>>>>> From what I've been told, I need to implement the IBackingMap and the
>>>>> multiput/multiget functions. So logically, I think it makes sense that I
>>>>> would put this update logiv in the multiput function. However, the
>>>>> OpaqueMap class already has multiGet logic in order to check the TxId of
>>>>> the batch.
>>>>> Instead of using an OpaqueMap class, should I just make my own
>>>>> implementation ?
>>>>>
>>>>> Thanks
>>>>> --
>>>>> Raphael Hsieh
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Cody A. Ray, LEED AP
>>>> cody.a.ray@gmail.com
>>>> 215.501.7891
>>>>
>>>
>>>
>>>
>>> --
>>> Raphael Hsieh
>>>
>>>
>>>
>>>
>>
>
>
> --
> Raphael Hsieh
>
>
>
>



-- 
Danijel Schiavuzzi

E: danijel@schiavuzzi.com
W: www.schiavuzzi.com
T: +385989035562
Skype: danijels7

Re: PersistentAggregate across batches

Posted by Raphael Hsieh <ra...@gmail.com>.
the previous link didn't work,
https://github.com/nathanmarz/storm/wiki/Trident-API-Overview#operations-on-grouped-streams


On Tue, Apr 22, 2014 at 10:30 AM, Raphael Hsieh <ra...@gmail.com>wrote:

> Yes partially,
> The part I was missing was getting old values and feeding it through the
> aggregator again, which still doesn't quite make sense to me.
>
> I am using an external datastore, so I am not able to use the vanilla
> MemcachedState, hence why I am implementing my own version of the
> IBackingMap.
>
> So let me try and explain what I am understanding.
> When I do something like
>
> Stream
>     .groupBy(new Fields("a")
>     .persistentAggregate(new MyStateFactory(), new Fields("a", "b", "c",
> "d"), new MyAggregator(), new Fields("resultMap"))
>
> What happens (as described here<https://github.com/nathanmarz/storm/wiki/Trident-API-Overview>)
> is the stream is split into different groups based on field "a":
> [image: Grouping]
> like so.
> then, PartitionPersist will run a MultiGet on the fields ("a", "b", "c",
> "d"), since that is what we are using as our keys. So in each of the
> "groups" described above, we would have not only the raw tuples resulting
> from the grouping, but also a single tuple with the result of the previous
> aggregation.
> These would all be run through the aggregator, which should be able to
> handle aggregating with this semi-complete aggregation (The "Reduce"
> function in a ReducerAggregator, or the "Combine" function in the
> CombinerAggregator).
>
> How does it know not to treat the previous aggregation as a single new
> tuple? (hence not running the "init" function ? For example if I was
> aggregating a count, having that previous value (say 60) as a single extra
> tuple would only increment the count by 1, instead of 60.
> would I then just need to implement my own "init" function such that it
> has checks for the tuple  value, whether it is a raw new tuple, vs a
> previous tuple aggregation?
>
>
> On Tue, Apr 22, 2014 at 9:59 AM, Cody A. Ray <co...@gmail.com> wrote:
>
>> My understanding is that the process is
>> 1. multiGet from the IBackingMap  is called and returns a value for each
>> key (or null if not present)
>> 2. For each key, the old value from the get and new values in the batch
>> are fed through the aggregator to produce one value per key
>> 3. This value is then stored back into the state through the multiPut in
>> the IBackingMap.
>>
>> If you just want to use nathanmarz's trident-memcached integration, you
>> don't have to write an IBackingMap yourself. The MemcachedState itself
>> implements IBackingMap to do the get and put. To use it, just decide what
>> you want to groupBy (these become your keys) and how you want it aggregated
>> (this is the reduced/combiner implementation). You don't have to write the
>> memcache connection logic or the aggregation logic yourself unless you want
>> to change how it's aggregated or stored.
>> I've not used the trident-memcached state in particular, but in general
>> this would look something like this:
>>
>> topology.newStream("spout1", spout1)
>>   .groupBy(new Fields("mykeyfield"))
>>   .persistentAggregate(MemcachedState.opaque(servers), new
>> Fields("myvaluefield"), new Sum(), new Fields("sum"))
>>
>> (Sorry for any code errors; writing in my phone)
>>
>> Does that answer your question?
>>
>> -Cody
>> On Apr 22, 2014 10:32 AM, "Raphael Hsieh" <ra...@gmail.com> wrote:
>>
>>> The Reducer/Combiner Aggregators hold logic in order to aggregate across
>>> an entire batch, however it does not have the logic to aggregate between
>>> batches.
>>> In order for this to happen, it must read the previous TransactionId and
>>> value from the datastore, determine whether this incoming data is in the
>>> right sequence, then increment the value within the datastore.
>>>
>>> I am asking about this second part. Where the logic goes in order to
>>> read previous data from the datastore, and add it to the new incoming
>>> aggregate data.
>>>
>>>
>>> On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray <co...@gmail.com>wrote:
>>>
>>>> Its the ReducerAggregate/CombinerAggregator's job to implement this
>>>> logic. Look at Count and Sum that are built-in to Trident. You can also
>>>> implement your own aggregator.
>>>>
>>>> -Cody
>>>>
>>>>
>>>> On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh <ra...@gmail.com>wrote:
>>>>
>>>>> If I am using an opaque spout and doing a persistent aggregate to a
>>>>> MemcachedState, how is it aggregating/incrementing the values across all
>>>>> batches ?
>>>>>
>>>>> I'm wanting to implement an IBackingMap so that I can use an external
>>>>> datastore. However, I'm unsure where the logic goes that will read the
>>>>> previous data, and aggregate it with the new data.
>>>>>
>>>>> From what I've been told, I need to implement the IBackingMap and the
>>>>> multiput/multiget functions. So logically, I think it makes sense that I
>>>>> would put this update logiv in the multiput function. However, the
>>>>> OpaqueMap class already has multiGet logic in order to check the TxId of
>>>>> the batch.
>>>>> Instead of using an OpaqueMap class, should I just make my own
>>>>> implementation ?
>>>>>
>>>>> Thanks
>>>>> --
>>>>> Raphael Hsieh
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Cody A. Ray, LEED AP
>>>> cody.a.ray@gmail.com
>>>> 215.501.7891
>>>>
>>>
>>>
>>>
>>> --
>>> Raphael Hsieh
>>>
>>>
>>>
>>>
>>
>
>
> --
> Raphael Hsieh
>
>
>
>



-- 
Raphael Hsieh

Re: PersistentAggregate across batches

Posted by Raphael Hsieh <ra...@gmail.com>.
Yes partially,
The part I was missing was getting old values and feeding it through the
aggregator again, which still doesn't quite make sense to me.

I am using an external datastore, so I am not able to use the vanilla
MemcachedState, hence why I am implementing my own version of the
IBackingMap.

So let me try and explain what I am understanding.
When I do something like

Stream
    .groupBy(new Fields("a")
    .persistentAggregate(new MyStateFactory(), new Fields("a", "b", "c",
"d"), new MyAggregator(), new Fields("resultMap"))

What happens (as described
here<https://github.com/nathanmarz/storm/wiki/Trident-API-Overview>)
is the stream is split into different groups based on field "a":
[image: Grouping]
like so.
then, PartitionPersist will run a MultiGet on the fields ("a", "b", "c",
"d"), since that is what we are using as our keys. So in each of the
"groups" described above, we would have not only the raw tuples resulting
from the grouping, but also a single tuple with the result of the previous
aggregation.
These would all be run through the aggregator, which should be able to
handle aggregating with this semi-complete aggregation (The "Reduce"
function in a ReducerAggregator, or the "Combine" function in the
CombinerAggregator).

How does it know not to treat the previous aggregation as a single new
tuple? (hence not running the "init" function ? For example if I was
aggregating a count, having that previous value (say 60) as a single extra
tuple would only increment the count by 1, instead of 60.
would I then just need to implement my own "init" function such that it has
checks for the tuple  value, whether it is a raw new tuple, vs a previous
tuple aggregation?


On Tue, Apr 22, 2014 at 9:59 AM, Cody A. Ray <co...@gmail.com> wrote:

> My understanding is that the process is
> 1. multiGet from the IBackingMap  is called and returns a value for each
> key (or null if not present)
> 2. For each key, the old value from the get and new values in the batch
> are fed through the aggregator to produce one value per key
> 3. This value is then stored back into the state through the multiPut in
> the IBackingMap.
>
> If you just want to use nathanmarz's trident-memcached integration, you
> don't have to write an IBackingMap yourself. The MemcachedState itself
> implements IBackingMap to do the get and put. To use it, just decide what
> you want to groupBy (these become your keys) and how you want it aggregated
> (this is the reduced/combiner implementation). You don't have to write the
> memcache connection logic or the aggregation logic yourself unless you want
> to change how it's aggregated or stored.
> I've not used the trident-memcached state in particular, but in general
> this would look something like this:
>
> topology.newStream("spout1", spout1)
>   .groupBy(new Fields("mykeyfield"))
>   .persistentAggregate(MemcachedState.opaque(servers), new
> Fields("myvaluefield"), new Sum(), new Fields("sum"))
>
> (Sorry for any code errors; writing in my phone)
>
> Does that answer your question?
>
> -Cody
> On Apr 22, 2014 10:32 AM, "Raphael Hsieh" <ra...@gmail.com> wrote:
>
>> The Reducer/Combiner Aggregators hold logic in order to aggregate across
>> an entire batch, however it does not have the logic to aggregate between
>> batches.
>> In order for this to happen, it must read the previous TransactionId and
>> value from the datastore, determine whether this incoming data is in the
>> right sequence, then increment the value within the datastore.
>>
>> I am asking about this second part. Where the logic goes in order to read
>> previous data from the datastore, and add it to the new incoming aggregate
>> data.
>>
>>
>> On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray <co...@gmail.com>wrote:
>>
>>> Its the ReducerAggregate/CombinerAggregator's job to implement this
>>> logic. Look at Count and Sum that are built-in to Trident. You can also
>>> implement your own aggregator.
>>>
>>> -Cody
>>>
>>>
>>> On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh <ra...@gmail.com>wrote:
>>>
>>>> If I am using an opaque spout and doing a persistent aggregate to a
>>>> MemcachedState, how is it aggregating/incrementing the values across all
>>>> batches ?
>>>>
>>>> I'm wanting to implement an IBackingMap so that I can use an external
>>>> datastore. However, I'm unsure where the logic goes that will read the
>>>> previous data, and aggregate it with the new data.
>>>>
>>>> From what I've been told, I need to implement the IBackingMap and the
>>>> multiput/multiget functions. So logically, I think it makes sense that I
>>>> would put this update logiv in the multiput function. However, the
>>>> OpaqueMap class already has multiGet logic in order to check the TxId of
>>>> the batch.
>>>> Instead of using an OpaqueMap class, should I just make my own
>>>> implementation ?
>>>>
>>>> Thanks
>>>> --
>>>> Raphael Hsieh
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> Cody A. Ray, LEED AP
>>> cody.a.ray@gmail.com
>>> 215.501.7891
>>>
>>
>>
>>
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>


-- 
Raphael Hsieh

Re: PersistentAggregate across batches

Posted by "Cody A. Ray" <co...@gmail.com>.
My understanding is that the process is
1. multiGet from the IBackingMap  is called and returns a value for each
key (or null if not present)
2. For each key, the old value from the get and new values in the batch are
fed through the aggregator to produce one value per key
3. This value is then stored back into the state through the multiPut in
the IBackingMap.

If you just want to use nathanmarz's trident-memcached integration, you
don't have to write an IBackingMap yourself. The MemcachedState itself
implements IBackingMap to do the get and put. To use it, just decide what
you want to groupBy (these become your keys) and how you want it aggregated
(this is the reduced/combiner implementation). You don't have to write the
memcache connection logic or the aggregation logic yourself unless you want
to change how it's aggregated or stored.
I've not used the trident-memcached state in particular, but in general
this would look something like this:

topology.newStream("spout1", spout1)
  .groupBy(new Fields("mykeyfield"))
  .persistentAggregate(MemcachedState.opaque(servers), new
Fields("myvaluefield"), new Sum(), new Fields("sum"))

(Sorry for any code errors; writing in my phone)

Does that answer your question?

-Cody
On Apr 22, 2014 10:32 AM, "Raphael Hsieh" <ra...@gmail.com> wrote:

> The Reducer/Combiner Aggregators hold logic in order to aggregate across
> an entire batch, however it does not have the logic to aggregate between
> batches.
> In order for this to happen, it must read the previous TransactionId and
> value from the datastore, determine whether this incoming data is in the
> right sequence, then increment the value within the datastore.
>
> I am asking about this second part. Where the logic goes in order to read
> previous data from the datastore, and add it to the new incoming aggregate
> data.
>
>
> On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray <co...@gmail.com> wrote:
>
>> Its the ReducerAggregate/CombinerAggregator's job to implement this
>> logic. Look at Count and Sum that are built-in to Trident. You can also
>> implement your own aggregator.
>>
>> -Cody
>>
>>
>> On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh <ra...@gmail.com>wrote:
>>
>>> If I am using an opaque spout and doing a persistent aggregate to a
>>> MemcachedState, how is it aggregating/incrementing the values across all
>>> batches ?
>>>
>>> I'm wanting to implement an IBackingMap so that I can use an external
>>> datastore. However, I'm unsure where the logic goes that will read the
>>> previous data, and aggregate it with the new data.
>>>
>>> From what I've been told, I need to implement the IBackingMap and the
>>> multiput/multiget functions. So logically, I think it makes sense that I
>>> would put this update logiv in the multiput function. However, the
>>> OpaqueMap class already has multiGet logic in order to check the TxId of
>>> the batch.
>>> Instead of using an OpaqueMap class, should I just make my own
>>> implementation ?
>>>
>>> Thanks
>>> --
>>> Raphael Hsieh
>>>
>>>
>>>
>>>
>>
>>
>>
>> --
>> Cody A. Ray, LEED AP
>> cody.a.ray@gmail.com
>> 215.501.7891
>>
>
>
>
> --
> Raphael Hsieh
> Amazon.com
> Software Development Engineer I
> (978) 764-9014
>
>
>
>

Re: PersistentAggregate across batches

Posted by Raphael Hsieh <ra...@gmail.com>.
The Reducer/Combiner Aggregators hold logic in order to aggregate across an
entire batch, however it does not have the logic to aggregate between
batches.
In order for this to happen, it must read the previous TransactionId and
value from the datastore, determine whether this incoming data is in the
right sequence, then increment the value within the datastore.

I am asking about this second part. Where the logic goes in order to read
previous data from the datastore, and add it to the new incoming aggregate
data.


On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray <co...@gmail.com> wrote:

> Its the ReducerAggregate/CombinerAggregator's job to implement this logic.
> Look at Count and Sum that are built-in to Trident. You can also implement
> your own aggregator.
>
> -Cody
>
>
> On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh <ra...@gmail.com>wrote:
>
>> If I am using an opaque spout and doing a persistent aggregate to a
>> MemcachedState, how is it aggregating/incrementing the values across all
>> batches ?
>>
>> I'm wanting to implement an IBackingMap so that I can use an external
>> datastore. However, I'm unsure where the logic goes that will read the
>> previous data, and aggregate it with the new data.
>>
>> From what I've been told, I need to implement the IBackingMap and the
>> multiput/multiget functions. So logically, I think it makes sense that I
>> would put this update logiv in the multiput function. However, the
>> OpaqueMap class already has multiGet logic in order to check the TxId of
>> the batch.
>> Instead of using an OpaqueMap class, should I just make my own
>> implementation ?
>>
>> Thanks
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>
>
>
> --
> Cody A. Ray, LEED AP
> cody.a.ray@gmail.com
> 215.501.7891
>



-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014

Re: PersistentAggregate across batches

Posted by "Cody A. Ray" <co...@gmail.com>.
Its the ReducerAggregate/CombinerAggregator's job to implement this logic.
Look at Count and Sum that are built-in to Trident. You can also implement
your own aggregator.

-Cody


On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh <ra...@gmail.com> wrote:

> If I am using an opaque spout and doing a persistent aggregate to a
> MemcachedState, how is it aggregating/incrementing the values across all
> batches ?
>
> I'm wanting to implement an IBackingMap so that I can use an external
> datastore. However, I'm unsure where the logic goes that will read the
> previous data, and aggregate it with the new data.
>
> From what I've been told, I need to implement the IBackingMap and the
> multiput/multiget functions. So logically, I think it makes sense that I
> would put this update logiv in the multiput function. However, the
> OpaqueMap class already has multiGet logic in order to check the TxId of
> the batch.
> Instead of using an OpaqueMap class, should I just make my own
> implementation ?
>
> Thanks
> --
> Raphael Hsieh
>
>
>
>



-- 
Cody A. Ray, LEED AP
cody.a.ray@gmail.com
215.501.7891