You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Raphael Hsieh <ra...@gmail.com> on 2014/04/17 01:28:48 UTC

How to think of batches vs partitions

Hi I found myself being confused on how to think of Storm/Trident
processing batches.
Are batches processed sequentially, but split into multiple partitions that
are spread throughout the worker nodes ?

Or are batches processed in parrallel and spread among worker nodes to be
split into partitions within each host running on multiple threads ?

Thanks!
-- 
Raphael Hsieh

Re: How to think of batches vs partitions

Posted by Nathan Marz <na...@nathanmarz.com>.
You just implement IBackingMap, and then wrap it in TransactionalMap or
OpaqueTransactionalMap


On Thu, Apr 17, 2014 at 11:23 AM, Raphael Hsieh <ra...@gmail.com>wrote:

> oh ok,
> So if I want the guarantee of single message processing when using an
> external datastore, I need to implement the methods described here<https://github.com/nathanmarz/storm/wiki/Trident-state#opaque-transactional-spouts>
>  (
> https://github.com/nathanmarz/storm/wiki/Trident-state#opaque-transactional-spouts)
> myself?
>
>
>
> On Thu, Apr 17, 2014 at 11:00 AM, Nathan Marz <na...@nathanmarz.com>wrote:
>
>> aggregate / partitionAggregate are only aggregations within the current
>> batch, the persistent equivalents are aggregations across all batches.
>>
>> The logic for querying states, updating them, and keeping track of batch
>> ids happens in the states themselves. For example, look at the multiUpdate
>> method in TransactionalMap:
>> https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/state/map/TransactionalMap.java
>>
>> Things are structured so that TransactionalMap delegates to an
>> "IBackingMap" which handles the actual persistence. IBackingMap just has
>> multiGet and multiPut methods. An implementation for a database (like
>> Cassandra, Riak, HBase, etc.) just has to implement IBackingMap.
>>
>>
>> On Thu, Apr 17, 2014 at 10:15 AM, Raphael Hsieh <ra...@gmail.com>wrote:
>>
>>> I guess I'm just confused as to when "multiGet" and "multiPut" are
>>> called when using an implementation of the IBackingMap
>>>
>>>
>>> On Thu, Apr 17, 2014 at 8:33 AM, Raphael Hsieh <ra...@gmail.com>wrote:
>>>
>>>> So from my understanding, this is how the different spout types
>>>> guarantee single message processing. For example, an opaque transactional
>>>> spout will look at transaction id's in order to guarantee in order batch
>>>> processing, making sure that the txid's are processed in order, and using
>>>> the previous and current values to fix any mixups.
>>>>
>>>> When doing an aggregation does it aggregation across all batches ? If
>>>> so, how does this happen ? Will it query the datastore for the current
>>>> value, then add the current aggregate value to the stored value in order to
>>>> create the global aggregate ? Where does this logic happen ? I can't seem
>>>> to find where this happens in the persistentAggregate or even
>>>> partitionPersist...
>>>>
>>>>
>>>> On Wed, Apr 16, 2014 at 8:30 PM, Nathan Marz <na...@nathanmarz.com>wrote:
>>>>
>>>>> Batches are processed sequentially, but each batch is partitioned (and
>>>>> therefore processed in parallel). As a batch is processed, it can be
>>>>> repartitioned an arbitrary number of times throughout the Trident topology.
>>>>>
>>>>>
>>>>> On Wed, Apr 16, 2014 at 4:28 PM, Raphael Hsieh <ra...@gmail.com>wrote:
>>>>>
>>>>>> Hi I found myself being confused on how to think of Storm/Trident
>>>>>> processing batches.
>>>>>> Are batches processed sequentially, but split into multiple
>>>>>> partitions that are spread throughout the worker nodes ?
>>>>>>
>>>>>> Or are batches processed in parrallel and spread among worker nodes
>>>>>> to be split into partitions within each host running on multiple threads ?
>>>>>>
>>>>>> Thanks!
>>>>>> --
>>>>>> Raphael Hsieh
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Twitter: @nathanmarz
>>>>> http://nathanmarz.com
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Raphael Hsieh
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> Raphael Hsieh
>>>
>>>
>>>
>>>
>>
>>
>>
>> --
>> Twitter: @nathanmarz
>> http://nathanmarz.com
>>
>
>
>
> --
> Raphael Hsieh
>
>
>
>



-- 
Twitter: @nathanmarz
http://nathanmarz.com

Re: How to think of batches vs partitions

Posted by Raphael Hsieh <ra...@gmail.com>.
oh ok,
So if I want the guarantee of single message processing when using an
external datastore, I need to implement the methods described
here<https://github.com/nathanmarz/storm/wiki/Trident-state#opaque-transactional-spouts>
 (
https://github.com/nathanmarz/storm/wiki/Trident-state#opaque-transactional-spouts)
myself?



On Thu, Apr 17, 2014 at 11:00 AM, Nathan Marz <na...@nathanmarz.com> wrote:

> aggregate / partitionAggregate are only aggregations within the current
> batch, the persistent equivalents are aggregations across all batches.
>
> The logic for querying states, updating them, and keeping track of batch
> ids happens in the states themselves. For example, look at the multiUpdate
> method in TransactionalMap:
> https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/state/map/TransactionalMap.java
>
> Things are structured so that TransactionalMap delegates to an
> "IBackingMap" which handles the actual persistence. IBackingMap just has
> multiGet and multiPut methods. An implementation for a database (like
> Cassandra, Riak, HBase, etc.) just has to implement IBackingMap.
>
>
> On Thu, Apr 17, 2014 at 10:15 AM, Raphael Hsieh <ra...@gmail.com>wrote:
>
>> I guess I'm just confused as to when "multiGet" and "multiPut" are called
>> when using an implementation of the IBackingMap
>>
>>
>> On Thu, Apr 17, 2014 at 8:33 AM, Raphael Hsieh <ra...@gmail.com>wrote:
>>
>>> So from my understanding, this is how the different spout types
>>> guarantee single message processing. For example, an opaque transactional
>>> spout will look at transaction id's in order to guarantee in order batch
>>> processing, making sure that the txid's are processed in order, and using
>>> the previous and current values to fix any mixups.
>>>
>>> When doing an aggregation does it aggregation across all batches ? If
>>> so, how does this happen ? Will it query the datastore for the current
>>> value, then add the current aggregate value to the stored value in order to
>>> create the global aggregate ? Where does this logic happen ? I can't seem
>>> to find where this happens in the persistentAggregate or even
>>> partitionPersist...
>>>
>>>
>>> On Wed, Apr 16, 2014 at 8:30 PM, Nathan Marz <na...@nathanmarz.com>wrote:
>>>
>>>> Batches are processed sequentially, but each batch is partitioned (and
>>>> therefore processed in parallel). As a batch is processed, it can be
>>>> repartitioned an arbitrary number of times throughout the Trident topology.
>>>>
>>>>
>>>> On Wed, Apr 16, 2014 at 4:28 PM, Raphael Hsieh <ra...@gmail.com>wrote:
>>>>
>>>>> Hi I found myself being confused on how to think of Storm/Trident
>>>>> processing batches.
>>>>> Are batches processed sequentially, but split into multiple partitions
>>>>> that are spread throughout the worker nodes ?
>>>>>
>>>>> Or are batches processed in parrallel and spread among worker nodes to
>>>>> be split into partitions within each host running on multiple threads ?
>>>>>
>>>>> Thanks!
>>>>> --
>>>>> Raphael Hsieh
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Twitter: @nathanmarz
>>>> http://nathanmarz.com
>>>>
>>>
>>>
>>>
>>> --
>>> Raphael Hsieh
>>>
>>>
>>>
>>
>>
>>
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>
>
>
> --
> Twitter: @nathanmarz
> http://nathanmarz.com
>



-- 
Raphael Hsieh

Re: How to think of batches vs partitions

Posted by Nathan Marz <na...@nathanmarz.com>.
aggregate / partitionAggregate are only aggregations within the current
batch, the persistent equivalents are aggregations across all batches.

The logic for querying states, updating them, and keeping track of batch
ids happens in the states themselves. For example, look at the multiUpdate
method in TransactionalMap:
https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/state/map/TransactionalMap.java

Things are structured so that TransactionalMap delegates to an
"IBackingMap" which handles the actual persistence. IBackingMap just has
multiGet and multiPut methods. An implementation for a database (like
Cassandra, Riak, HBase, etc.) just has to implement IBackingMap.


On Thu, Apr 17, 2014 at 10:15 AM, Raphael Hsieh <ra...@gmail.com>wrote:

> I guess I'm just confused as to when "multiGet" and "multiPut" are called
> when using an implementation of the IBackingMap
>
>
> On Thu, Apr 17, 2014 at 8:33 AM, Raphael Hsieh <ra...@gmail.com>wrote:
>
>> So from my understanding, this is how the different spout types guarantee
>> single message processing. For example, an opaque transactional spout will
>> look at transaction id's in order to guarantee in order batch processing,
>> making sure that the txid's are processed in order, and using the previous
>> and current values to fix any mixups.
>>
>> When doing an aggregation does it aggregation across all batches ? If so,
>> how does this happen ? Will it query the datastore for the current value,
>> then add the current aggregate value to the stored value in order to create
>> the global aggregate ? Where does this logic happen ? I can't seem to find
>> where this happens in the persistentAggregate or even partitionPersist...
>>
>>
>> On Wed, Apr 16, 2014 at 8:30 PM, Nathan Marz <na...@nathanmarz.com>wrote:
>>
>>> Batches are processed sequentially, but each batch is partitioned (and
>>> therefore processed in parallel). As a batch is processed, it can be
>>> repartitioned an arbitrary number of times throughout the Trident topology.
>>>
>>>
>>> On Wed, Apr 16, 2014 at 4:28 PM, Raphael Hsieh <ra...@gmail.com>wrote:
>>>
>>>> Hi I found myself being confused on how to think of Storm/Trident
>>>> processing batches.
>>>> Are batches processed sequentially, but split into multiple partitions
>>>> that are spread throughout the worker nodes ?
>>>>
>>>> Or are batches processed in parrallel and spread among worker nodes to
>>>> be split into partitions within each host running on multiple threads ?
>>>>
>>>> Thanks!
>>>> --
>>>> Raphael Hsieh
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> Twitter: @nathanmarz
>>> http://nathanmarz.com
>>>
>>
>>
>>
>> --
>> Raphael Hsieh
>>
>>
>>
>
>
>
> --
> Raphael Hsieh
> Amazon.com
> Software Development Engineer I
> (978) 764-9014
>
>
>
>



-- 
Twitter: @nathanmarz
http://nathanmarz.com

Re: How to think of batches vs partitions

Posted by Raphael Hsieh <ra...@gmail.com>.
I guess I'm just confused as to when "multiGet" and "multiPut" are called
when using an implementation of the IBackingMap


On Thu, Apr 17, 2014 at 8:33 AM, Raphael Hsieh <ra...@gmail.com> wrote:

> So from my understanding, this is how the different spout types guarantee
> single message processing. For example, an opaque transactional spout will
> look at transaction id's in order to guarantee in order batch processing,
> making sure that the txid's are processed in order, and using the previous
> and current values to fix any mixups.
>
> When doing an aggregation does it aggregation across all batches ? If so,
> how does this happen ? Will it query the datastore for the current value,
> then add the current aggregate value to the stored value in order to create
> the global aggregate ? Where does this logic happen ? I can't seem to find
> where this happens in the persistentAggregate or even partitionPersist...
>
>
> On Wed, Apr 16, 2014 at 8:30 PM, Nathan Marz <na...@nathanmarz.com>wrote:
>
>> Batches are processed sequentially, but each batch is partitioned (and
>> therefore processed in parallel). As a batch is processed, it can be
>> repartitioned an arbitrary number of times throughout the Trident topology.
>>
>>
>> On Wed, Apr 16, 2014 at 4:28 PM, Raphael Hsieh <ra...@gmail.com>wrote:
>>
>>> Hi I found myself being confused on how to think of Storm/Trident
>>> processing batches.
>>> Are batches processed sequentially, but split into multiple partitions
>>> that are spread throughout the worker nodes ?
>>>
>>> Or are batches processed in parrallel and spread among worker nodes to
>>> be split into partitions within each host running on multiple threads ?
>>>
>>> Thanks!
>>> --
>>> Raphael Hsieh
>>>
>>>
>>>
>>>
>>
>>
>>
>> --
>> Twitter: @nathanmarz
>> http://nathanmarz.com
>>
>
>
>
> --
> Raphael Hsieh
>
>
>



-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014

Re: How to think of batches vs partitions

Posted by Raphael Hsieh <ra...@gmail.com>.
So from my understanding, this is how the different spout types guarantee
single message processing. For example, an opaque transactional spout will
look at transaction id's in order to guarantee in order batch processing,
making sure that the txid's are processed in order, and using the previous
and current values to fix any mixups.

When doing an aggregation does it aggregation across all batches ? If so,
how does this happen ? Will it query the datastore for the current value,
then add the current aggregate value to the stored value in order to create
the global aggregate ? Where does this logic happen ? I can't seem to find
where this happens in the persistentAggregate or even partitionPersist...


On Wed, Apr 16, 2014 at 8:30 PM, Nathan Marz <na...@nathanmarz.com> wrote:

> Batches are processed sequentially, but each batch is partitioned (and
> therefore processed in parallel). As a batch is processed, it can be
> repartitioned an arbitrary number of times throughout the Trident topology.
>
>
> On Wed, Apr 16, 2014 at 4:28 PM, Raphael Hsieh <ra...@gmail.com>wrote:
>
>> Hi I found myself being confused on how to think of Storm/Trident
>> processing batches.
>> Are batches processed sequentially, but split into multiple partitions
>> that are spread throughout the worker nodes ?
>>
>> Or are batches processed in parrallel and spread among worker nodes to be
>> split into partitions within each host running on multiple threads ?
>>
>> Thanks!
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>
>
>
> --
> Twitter: @nathanmarz
> http://nathanmarz.com
>



-- 
Raphael Hsieh

Re: How to think of batches vs partitions

Posted by Nathan Marz <na...@nathanmarz.com>.
Batches are processed sequentially, but each batch is partitioned (and
therefore processed in parallel). As a batch is processed, it can be
repartitioned an arbitrary number of times throughout the Trident topology.


On Wed, Apr 16, 2014 at 4:28 PM, Raphael Hsieh <ra...@gmail.com> wrote:

> Hi I found myself being confused on how to think of Storm/Trident
> processing batches.
> Are batches processed sequentially, but split into multiple partitions
> that are spread throughout the worker nodes ?
>
> Or are batches processed in parrallel and spread among worker nodes to be
> split into partitions within each host running on multiple threads ?
>
> Thanks!
> --
> Raphael Hsieh
>
>
>
>



-- 
Twitter: @nathanmarz
http://nathanmarz.com