You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by prasenjit mukherjee <pr...@gmail.com> on 2009/02/11 06:06:28 UTC

memory requirement in reducephase

I am trying to understand the memory bottleneck in pig's operators.
Are all the row-elements of a group-entry loaded in the memory while
processing foreach ... command ?

For example in the following code :
raw = LOAD 'report.csv' USING PigStorage(',') AS (user, usage, time);
grp_by_user = GROUP raw BY (user);
usage_counts = FOREACH grp_by_user GENERATE group, SUM(raw.usage);

In the last line does pig load all the entries of a user's usage in
the memory or it does the SUM iteratively without loading the entire
row ?

Thanks,
Prasen

Re: memory requirement in reducephase

Posted by Alan Gates <ga...@yahoo-inc.com>.
On Feb 12, 2009, at 12:27 AM, prasenjit mukherjee wrote:

> Really appreciate  your prompt reply.  I do have some follow up
> questions inline.
>
> On Wed, Feb 11, 2009 at 10:16 PM, Alan Gates <ga...@yahoo-inc.com>  
> wrote:
>> In general, pig first collects all of the values for a given group  
>> into
>> memory and then passes it to the UDF.  This is done to support  
>> functions
>> that need to see all of the values at once.
>
> How does pig know whther a UDF function need to see all values at once
> or it can be done iteratively ( like SUM ). Is the only option to use
> a combiner  as you have mentioned below. Or there are other ways as
> well ?
The UDF must implement the Algebraic interface.  See http://wiki.apache.org/pig/UDFManual

>
>
>>
>> Functions that can be decomposed to run over single values or  
>> subsets of
>> values (that is, they are distributive or algebraic) such as SUM are
>> distributed by use of the combiner.  So in the query you give  
>> below, the
>> combiner would be invoked (possibly multiple times) between the map  
>> and
>> reduce phases.  SUM will get all of the values for a given group at  
>> once,
>> but the upper bound for the size of this set is the fan in factor  
>> for merge
>> (if there are too many map results, hadoop will do intermediate  
>> merges and
>> invoke the combiner on each merge before passing those results to the
>> reducers).
>
> How do I set/configure the upper bound of this sieze which will be
> loaded in the memory ?
You don't.  Pig monitors its memory usage and spills oversized sets to  
disk as necessary.  As monitoring memory usage in java is inexact, we  
do not do a good job at this and sometimes run out of memory.

>
>
>> So, functions like SUM should not present a memory bottleneck.   
>> Functions
>> that cannot be decomposed (like a click stream analysis function  
>> that wants
>> to see a user's entire click stream at once) can present  
>> bottlenecks.  Also
>> joins can, since for an n way join we materialize n - 1 of the key  
>> sets in
>> memory in order to do the cross product.
>>
>
> Please bear with me as I am relatively new to pig/haddop. Not able to
> understand as to why cogroup(i.e. join) can be a bottleneck. Lets take
> the following join example:
>
> table_a = LOAD 'a.csv' USING PigStorage(',') AS (user, usage);
> table_b = LOAD 'b.csv' USING PigStorage(',') AS (rate, username);
> joined_table = join table_a by user,table_b by username;
>
> Assuming
> A = (joe, 120),(joe,230),(john,100)
> B =  (5, joe), (3,john)
>
> joe =   (joe,120),(joe,230)   (5,joe)
> john =  (john,100)  (3,john)
>
> In reduce phase we can do crossproduct between 2(can be n as well)
> different streams by keeping n-pointers and iterating over all teh
> values. Is there a need to load ALL the entries from all the tables
> into memory ?

Streams in hadoop are not rewindable.  Pig only gets to see each value  
from the stream once.  So we cache all of the keys from n - 1 inputs  
and stream through the last input.

Alan.

Re: memory requirement in reducephase

Posted by prasenjit mukherjee <pr...@gmail.com>.
Really appreciate  your prompt reply.  I do have some follow up
questions inline.

On Wed, Feb 11, 2009 at 10:16 PM, Alan Gates <ga...@yahoo-inc.com> wrote:
> In general, pig first collects all of the values for a given group into
> memory and then passes it to the UDF.  This is done to support functions
> that need to see all of the values at once.

How does pig know whther a UDF function need to see all values at once
or it can be done iteratively ( like SUM ). Is the only option to use
a combiner  as you have mentioned below. Or there are other ways as
well ?

>
> Functions that can be decomposed to run over single values or subsets of
> values (that is, they are distributive or algebraic) such as SUM are
> distributed by use of the combiner.  So in the query you give below, the
> combiner would be invoked (possibly multiple times) between the map and
> reduce phases.  SUM will get all of the values for a given group at once,
> but the upper bound for the size of this set is the fan in factor for merge
> (if there are too many map results, hadoop will do intermediate merges and
> invoke the combiner on each merge before passing those results to the
> reducers).

How do I set/configure the upper bound of this sieze which will be
loaded in the memory ?

> So, functions like SUM should not present a memory bottleneck.  Functions
> that cannot be decomposed (like a click stream analysis function that wants
> to see a user's entire click stream at once) can present bottlenecks.  Also
> joins can, since for an n way join we materialize n - 1 of the key sets in
> memory in order to do the cross product.
>

Please bear with me as I am relatively new to pig/haddop. Not able to
understand as to why cogroup(i.e. join) can be a bottleneck. Lets take
the following join example:

table_a = LOAD 'a.csv' USING PigStorage(',') AS (user, usage);
table_b = LOAD 'b.csv' USING PigStorage(',') AS (rate, username);
joined_table = join table_a by user,table_b by username;

Assuming
A = (joe, 120),(joe,230),(john,100)
B =  (5, joe), (3,john)

joe =   (joe,120),(joe,230)   (5,joe)
john =  (john,100)  (3,john)

In reduce phase we can do crossproduct between 2(can be n as well)
different streams by keeping n-pointers and iterating over all teh
values. Is there a need to load ALL the entries from all the tables
into memory ?

Thanks in advance,
Prasen

> Alan.
>
> On Feb 10, 2009, at 9:06 PM, prasenjit mukherjee wrote:
>
>> I am trying to understand the memory bottleneck in pig's operators.
>> Are all the row-elements of a group-entry loaded in the memory while
>> processing foreach ... command ?
>>
>> For example in the following code :
>> raw = LOAD 'report.csv' USING PigStorage(',') AS (user, usage, time);
>> grp_by_user = GROUP raw BY (user);
>> usage_counts = FOREACH grp_by_user GENERATE group, SUM(raw.usage);
>>
>> In the last line does pig load all the entries of a user's usage in
>> the memory or it does the SUM iteratively without loading the entire
>> row ?
>>
>> Thanks,
>> Prasen
>
>

Re: memory requirement in reducephase

Posted by Alan Gates <ga...@yahoo-inc.com>.
In general, pig first collects all of the values for a given group  
into memory and then passes it to the UDF.  This is done to support  
functions that need to see all of the values at once.

Functions that can be decomposed to run over single values or subsets  
of values (that is, they are distributive or algebraic) such as SUM  
are distributed by use of the combiner.  So in the query you give  
below, the combiner would be invoked (possibly multiple times) between  
the map and reduce phases.  SUM will get all of the values for a given  
group at once, but the upper bound for the size of this set is the fan  
in factor for merge (if there are too many map results, hadoop will do  
intermediate merges and invoke the combiner on each merge before  
passing those results to the reducers).

So, functions like SUM should not present a memory bottleneck.   
Functions that cannot be decomposed (like a click stream analysis  
function that wants to see a user's entire click stream at once) can  
present bottlenecks.  Also joins can, since for an n way join we  
materialize n - 1 of the key sets in memory in order to do the cross  
product.

Alan.

On Feb 10, 2009, at 9:06 PM, prasenjit mukherjee wrote:

> I am trying to understand the memory bottleneck in pig's operators.
> Are all the row-elements of a group-entry loaded in the memory while
> processing foreach ... command ?
>
> For example in the following code :
> raw = LOAD 'report.csv' USING PigStorage(',') AS (user, usage, time);
> grp_by_user = GROUP raw BY (user);
> usage_counts = FOREACH grp_by_user GENERATE group, SUM(raw.usage);
>
> In the last line does pig load all the entries of a user's usage in
> the memory or it does the SUM iteratively without loading the entire
> row ?
>
> Thanks,
> Prasen