You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Raphael Hsieh <ra...@gmail.com> on 2014/04/09 18:15:31 UTC

setting trident transaction window

I have been struggling to figure out how to get trident to aggregate data
over a certain time period then flush the data to an external data store.
The reasoning behind this is to reduce the number of reads and writes sent
to the database.

I've seen that Storm allows for tick tuples to be inserted into the stream,
however I can't figure out how to do this with trident. I had thought that
this functionality was added with Storm version 0.8.0 ?
Is this the case ?

One thing I had tried was to create a new stream that emitted a tuple once
every X time period, then I tried to merge this stream into my actual data
stream. However, doing this would result in a non transactional stream
which would be no good. Also it didn't work, as the resulting stream only
consisted of tuples from my clock stream.

Can anybody help me figure out how to have Trident aggregate data over a
certain time frame, flush it out to an external datastore, then rinse and
repeat ?

there are some blogs out there regarding how to use a sliding window in
storm, however I just want sequential windows in Trident.

Thanks

-- 
Raphael Hsieh

Re: setting trident transaction window

Posted by Raphael Hsieh <ra...@gmail.com>.
The documentation for trident aggregations mentions that aggregations are
done globally across all batches.
https://github.com/nathanmarz/storm/wiki/Trident-API-Overview#aggregation-operations
Is this incorrect ?

When the documentation says "all batches" it does mean all batches across
all worker nodes right ?


On Thu, Apr 10, 2014 at 3:21 PM, Jason Jackson <ja...@gmail.com> wrote:

>
>
>
> stream.aggregate(...) computes aggregations per batch
> stream.groupBy(..).aggregate(..) computes aggregations per key per batch.
>
> stream.persistentAggregate(..) computes partial aggregations per batch, DB
> is updated with partial aggregates per batch
> stream.groupBy(..).persistentAggregate(..) computes partial aggregations
> per batch per key, DB is updated with partial aggregates per batch.
>
> you can also think of persistent aggregate as computing DB deltas, and
> only sending the deltas to the DB.
>
> Whether increasing the batch size reduces the total amount of writes to
> persistent store is not strictly true, but in practice it does. E.g.
> imagine our stream has 5 unique keys and you do a
> groupBy.persistentAggregate, and the throughput is 1B/sec. If you have
> batches of 5B items, then after 5 seconds you sent ~10 key/vals updates to
> the DB, if you have batches of 0.5B items, then after 10 seconds you've
> sent ~100 key/vals to the DB.
>
> Some of the tradeoffs here are that if a batch fails you have to do more
> recomputation. And greater chance for a batch to fail as there's more
> tuples per batch. This should though, definitely give it a shot.
>
>
>
>
>
>
> On Thu, Apr 10, 2014 at 1:33 PM, Raphael Hsieh <ra...@gmail.com>wrote:
>
>> Thanks for your reply Jason,
>> So what I'm hearing is that there is no nice way of doing temporal
>> flushes to a database. My main reason for wanting to do this is because I
>> want to use DynamoDB for my external datastore, but it gets expensive. I
>> would like to limit my reads and writes as much as I can so that the cost
>> does not add up.
>>
>> Increasing the batch size seems like the best solution so far, however
>> from my understanding doing an aggregation in storm/trident does a global
>> aggregation, so do batch sizes really make a difference ? Or is my
>> understanding of the aggregation process wrong. I am had though that
>> aggregating is global among all partitions (and storm nodes).
>>
>>
>> On Thu, Apr 10, 2014 at 1:58 AM, Jason Jackson <ja...@gmail.com>wrote:
>>
>>> trident doesn't expose tick tuples in it's API yet, even though it was
>>> added in storm a while ago.
>>>
>>> There's two problems I think you're talking about (1) windowed
>>> aggregations (2) reducing DB load.
>>>
>>> For (1)
>>> Trident can do aggregations at the batch level but this doesn't really
>>> help you for doing aggregations over a range of timestamps. The way to do
>>> that is you would include the timebucket in your key when
>>> persistentAggregate. E.g. your key could be "apple-2014-01-02-12:40:64"
>>> for minutely buckets. Then when serving the data you would query all keys
>>> across the time range. Certain databases such as cassandra can make this
>>> query very fast.
>>>
>>> (2) You'll need to implement your own IBackingMap persistent store
>>> plugin and pass it to persistentAggregate. Look other examples such as the
>>> trident-memcache for how to implement these. So for your custom persistent
>>> store plugin  you could use a combination of in-memory map and DB. 4/5
>>> batches would just commit their state updates to the in-memory map. The 5th
>>> batch would commit to the in-memory map, and then flush that map to the
>>> database. You could even launch a separate thread to do the flushing,
>>> incase it takes a while. This design however is not going to give you
>>> exactly once semantics. As if you loose the in-memory map because your
>>> worker died for example, then when it comes back online it will still
>>> resume from the last successful batch (not the last flushed batch).
>>>
>>> To retain exactly once semantics you could also make your batch sizes
>>> much larger, by default they read 1MB from each kafka partition (see
>>> bufferSize and fetchSize configuration option in Kafka Spout). IF you
>>> increased batch size, and you're doing some kind of key based aggregations,
>>> then this would reduce the total number of writes you would need to do your
>>> persistent storage.
>>>
>>> Trident could definitely be improved here, so your mileage may vary.
>>>
>>>
>>> On Wed, Apr 9, 2014 at 9:15 AM, Raphael Hsieh <ra...@gmail.com>wrote:
>>>
>>>> I have been struggling to figure out how to get trident to aggregate
>>>> data over a certain time period then flush the data to an external data
>>>> store.
>>>> The reasoning behind this is to reduce the number of reads and writes
>>>> sent to the database.
>>>>
>>>> I've seen that Storm allows for tick tuples to be inserted into the
>>>> stream, however I can't figure out how to do this with trident. I had
>>>> thought that this functionality was added with Storm version 0.8.0 ?
>>>> Is this the case ?
>>>>
>>>> One thing I had tried was to create a new stream that emitted a tuple
>>>> once every X time period, then I tried to merge this stream into my actual
>>>> data stream. However, doing this would result in a non transactional stream
>>>> which would be no good. Also it didn't work, as the resulting stream only
>>>> consisted of tuples from my clock stream.
>>>>
>>>> Can anybody help me figure out how to have Trident aggregate data over
>>>> a certain time frame, flush it out to an external datastore, then rinse and
>>>> repeat ?
>>>>
>>>> there are some blogs out there regarding how to use a sliding window in
>>>> storm, however I just want sequential windows in Trident.
>>>>
>>>> Thanks
>>>>
>>>> --
>>>> Raphael Hsieh
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>> --
>> Raphael Hsieh
>> Amazon.com
>> Software Development Engineer I
>> (978) 764-9014
>>
>>
>>
>>
>
>


-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014

Re: setting trident transaction window

Posted by Jason Jackson <ja...@gmail.com>.
stream.aggregate(...) computes aggregations per batch
stream.groupBy(..).aggregate(..) computes aggregations per key per batch.

stream.persistentAggregate(..) computes partial aggregations per batch, DB
is updated with partial aggregates per batch
stream.groupBy(..).persistentAggregate(..) computes partial aggregations
per batch per key, DB is updated with partial aggregates per batch.

you can also think of persistent aggregate as computing DB deltas, and only
sending the deltas to the DB.

Whether increasing the batch size reduces the total amount of writes to
persistent store is not strictly true, but in practice it does. E.g.
imagine our stream has 5 unique keys and you do a
groupBy.persistentAggregate, and the throughput is 1B/sec. If you have
batches of 5B items, then after 5 seconds you sent ~10 key/vals updates to
the DB, if you have batches of 0.5B items, then after 10 seconds you've
sent ~100 key/vals to the DB.

Some of the tradeoffs here are that if a batch fails you have to do more
recomputation. And greater chance for a batch to fail as there's more
tuples per batch. This should though, definitely give it a shot.






On Thu, Apr 10, 2014 at 1:33 PM, Raphael Hsieh <ra...@gmail.com> wrote:

> Thanks for your reply Jason,
> So what I'm hearing is that there is no nice way of doing temporal flushes
> to a database. My main reason for wanting to do this is because I want to
> use DynamoDB for my external datastore, but it gets expensive. I would like
> to limit my reads and writes as much as I can so that the cost does not add
> up.
>
> Increasing the batch size seems like the best solution so far, however
> from my understanding doing an aggregation in storm/trident does a global
> aggregation, so do batch sizes really make a difference ? Or is my
> understanding of the aggregation process wrong. I am had though that
> aggregating is global among all partitions (and storm nodes).
>
>
> On Thu, Apr 10, 2014 at 1:58 AM, Jason Jackson <ja...@gmail.com>wrote:
>
>> trident doesn't expose tick tuples in it's API yet, even though it was
>> added in storm a while ago.
>>
>> There's two problems I think you're talking about (1) windowed
>> aggregations (2) reducing DB load.
>>
>> For (1)
>> Trident can do aggregations at the batch level but this doesn't really
>> help you for doing aggregations over a range of timestamps. The way to do
>> that is you would include the timebucket in your key when
>> persistentAggregate. E.g. your key could be "apple-2014-01-02-12:40:64"
>> for minutely buckets. Then when serving the data you would query all keys
>> across the time range. Certain databases such as cassandra can make this
>> query very fast.
>>
>> (2) You'll need to implement your own IBackingMap persistent store plugin
>> and pass it to persistentAggregate. Look other examples such as the
>> trident-memcache for how to implement these. So for your custom persistent
>> store plugin  you could use a combination of in-memory map and DB. 4/5
>> batches would just commit their state updates to the in-memory map. The 5th
>> batch would commit to the in-memory map, and then flush that map to the
>> database. You could even launch a separate thread to do the flushing,
>> incase it takes a while. This design however is not going to give you
>> exactly once semantics. As if you loose the in-memory map because your
>> worker died for example, then when it comes back online it will still
>> resume from the last successful batch (not the last flushed batch).
>>
>> To retain exactly once semantics you could also make your batch sizes
>> much larger, by default they read 1MB from each kafka partition (see
>> bufferSize and fetchSize configuration option in Kafka Spout). IF you
>> increased batch size, and you're doing some kind of key based aggregations,
>> then this would reduce the total number of writes you would need to do your
>> persistent storage.
>>
>> Trident could definitely be improved here, so your mileage may vary.
>>
>>
>> On Wed, Apr 9, 2014 at 9:15 AM, Raphael Hsieh <ra...@gmail.com>wrote:
>>
>>> I have been struggling to figure out how to get trident to aggregate
>>> data over a certain time period then flush the data to an external data
>>> store.
>>> The reasoning behind this is to reduce the number of reads and writes
>>> sent to the database.
>>>
>>> I've seen that Storm allows for tick tuples to be inserted into the
>>> stream, however I can't figure out how to do this with trident. I had
>>> thought that this functionality was added with Storm version 0.8.0 ?
>>> Is this the case ?
>>>
>>> One thing I had tried was to create a new stream that emitted a tuple
>>> once every X time period, then I tried to merge this stream into my actual
>>> data stream. However, doing this would result in a non transactional stream
>>> which would be no good. Also it didn't work, as the resulting stream only
>>> consisted of tuples from my clock stream.
>>>
>>> Can anybody help me figure out how to have Trident aggregate data over a
>>> certain time frame, flush it out to an external datastore, then rinse and
>>> repeat ?
>>>
>>> there are some blogs out there regarding how to use a sliding window in
>>> storm, however I just want sequential windows in Trident.
>>>
>>> Thanks
>>>
>>> --
>>> Raphael Hsieh
>>>
>>>
>>>
>>>
>>
>>
>
>
> --
> Raphael Hsieh
> Amazon.com
> Software Development Engineer I
> (978) 764-9014
>
>
>
>

Re: setting trident transaction window

Posted by Raphael Hsieh <ra...@gmail.com>.
Thanks for your reply Jason,
So what I'm hearing is that there is no nice way of doing temporal flushes
to a database. My main reason for wanting to do this is because I want to
use DynamoDB for my external datastore, but it gets expensive. I would like
to limit my reads and writes as much as I can so that the cost does not add
up.

Increasing the batch size seems like the best solution so far, however from
my understanding doing an aggregation in storm/trident does a global
aggregation, so do batch sizes really make a difference ? Or is my
understanding of the aggregation process wrong. I am had though that
aggregating is global among all partitions (and storm nodes).


On Thu, Apr 10, 2014 at 1:58 AM, Jason Jackson <ja...@gmail.com> wrote:

> trident doesn't expose tick tuples in it's API yet, even though it was
> added in storm a while ago.
>
> There's two problems I think you're talking about (1) windowed
> aggregations (2) reducing DB load.
>
> For (1)
> Trident can do aggregations at the batch level but this doesn't really
> help you for doing aggregations over a range of timestamps. The way to do
> that is you would include the timebucket in your key when
> persistentAggregate. E.g. your key could be "apple-2014-01-02-12:40:64"
> for minutely buckets. Then when serving the data you would query all keys
> across the time range. Certain databases such as cassandra can make this
> query very fast.
>
> (2) You'll need to implement your own IBackingMap persistent store plugin
> and pass it to persistentAggregate. Look other examples such as the
> trident-memcache for how to implement these. So for your custom persistent
> store plugin  you could use a combination of in-memory map and DB. 4/5
> batches would just commit their state updates to the in-memory map. The 5th
> batch would commit to the in-memory map, and then flush that map to the
> database. You could even launch a separate thread to do the flushing,
> incase it takes a while. This design however is not going to give you
> exactly once semantics. As if you loose the in-memory map because your
> worker died for example, then when it comes back online it will still
> resume from the last successful batch (not the last flushed batch).
>
> To retain exactly once semantics you could also make your batch sizes much
> larger, by default they read 1MB from each kafka partition (see bufferSize
> and fetchSize configuration option in Kafka Spout). IF you increased batch
> size, and you're doing some kind of key based aggregations, then this would
> reduce the total number of writes you would need to do your persistent
> storage.
>
> Trident could definitely be improved here, so your mileage may vary.
>
>
> On Wed, Apr 9, 2014 at 9:15 AM, Raphael Hsieh <ra...@gmail.com>wrote:
>
>> I have been struggling to figure out how to get trident to aggregate data
>> over a certain time period then flush the data to an external data store.
>> The reasoning behind this is to reduce the number of reads and writes
>> sent to the database.
>>
>> I've seen that Storm allows for tick tuples to be inserted into the
>> stream, however I can't figure out how to do this with trident. I had
>> thought that this functionality was added with Storm version 0.8.0 ?
>> Is this the case ?
>>
>> One thing I had tried was to create a new stream that emitted a tuple
>> once every X time period, then I tried to merge this stream into my actual
>> data stream. However, doing this would result in a non transactional stream
>> which would be no good. Also it didn't work, as the resulting stream only
>> consisted of tuples from my clock stream.
>>
>> Can anybody help me figure out how to have Trident aggregate data over a
>> certain time frame, flush it out to an external datastore, then rinse and
>> repeat ?
>>
>> there are some blogs out there regarding how to use a sliding window in
>> storm, however I just want sequential windows in Trident.
>>
>> Thanks
>>
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>
>


-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014

Re: setting trident transaction window

Posted by Jason Jackson <ja...@gmail.com>.
trident doesn't expose tick tuples in it's API yet, even though it was
added in storm a while ago.

There's two problems I think you're talking about (1) windowed aggregations
(2) reducing DB load.

For (1)
Trident can do aggregations at the batch level but this doesn't really help
you for doing aggregations over a range of timestamps. The way to do that
is you would include the timebucket in your key when persistentAggregate.
E.g. your key could be "apple-2014-01-02-12:40:64" for minutely buckets.
Then when serving the data you would query all keys across the time range.
Certain databases such as cassandra can make this query very fast.

(2) You'll need to implement your own IBackingMap persistent store plugin
and pass it to persistentAggregate. Look other examples such as the
trident-memcache for how to implement these. So for your custom persistent
store plugin  you could use a combination of in-memory map and DB. 4/5
batches would just commit their state updates to the in-memory map. The 5th
batch would commit to the in-memory map, and then flush that map to the
database. You could even launch a separate thread to do the flushing,
incase it takes a while. This design however is not going to give you
exactly once semantics. As if you loose the in-memory map because your
worker died for example, then when it comes back online it will still
resume from the last successful batch (not the last flushed batch).

To retain exactly once semantics you could also make your batch sizes much
larger, by default they read 1MB from each kafka partition (see bufferSize
and fetchSize configuration option in Kafka Spout). IF you increased batch
size, and you're doing some kind of key based aggregations, then this would
reduce the total number of writes you would need to do your persistent
storage.

Trident could definitely be improved here, so your mileage may vary.


On Wed, Apr 9, 2014 at 9:15 AM, Raphael Hsieh <ra...@gmail.com> wrote:

> I have been struggling to figure out how to get trident to aggregate data
> over a certain time period then flush the data to an external data store.
> The reasoning behind this is to reduce the number of reads and writes sent
> to the database.
>
> I've seen that Storm allows for tick tuples to be inserted into the
> stream, however I can't figure out how to do this with trident. I had
> thought that this functionality was added with Storm version 0.8.0 ?
> Is this the case ?
>
> One thing I had tried was to create a new stream that emitted a tuple once
> every X time period, then I tried to merge this stream into my actual data
> stream. However, doing this would result in a non transactional stream
> which would be no good. Also it didn't work, as the resulting stream only
> consisted of tuples from my clock stream.
>
> Can anybody help me figure out how to have Trident aggregate data over a
> certain time frame, flush it out to an external datastore, then rinse and
> repeat ?
>
> there are some blogs out there regarding how to use a sliding window in
> storm, however I just want sequential windows in Trident.
>
> Thanks
>
> --
> Raphael Hsieh
>
>
>
>