You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by prasenjit mukherjee <pr...@gmail.com> on 2009/02/11 06:06:28 UTC
memory requirement in reducephase
I am trying to understand the memory bottleneck in pig's operators.
Are all the row-elements of a group-entry loaded in the memory while
processing foreach ... command ?
For example in the following code :
raw = LOAD 'report.csv' USING PigStorage(',') AS (user, usage, time);
grp_by_user = GROUP raw BY (user);
usage_counts = FOREACH grp_by_user GENERATE group, SUM(raw.usage);
In the last line does pig load all the entries of a user's usage in
the memory or it does the SUM iteratively without loading the entire
row ?
Thanks,
Prasen
Re: memory requirement in reducephase
Posted by Alan Gates <ga...@yahoo-inc.com>.
On Feb 12, 2009, at 12:27 AM, prasenjit mukherjee wrote:
> Really appreciate your prompt reply. I do have some follow up
> questions inline.
>
> On Wed, Feb 11, 2009 at 10:16 PM, Alan Gates <ga...@yahoo-inc.com>
> wrote:
>> In general, pig first collects all of the values for a given group
>> into
>> memory and then passes it to the UDF. This is done to support
>> functions
>> that need to see all of the values at once.
>
> How does pig know whther a UDF function need to see all values at once
> or it can be done iteratively ( like SUM ). Is the only option to use
> a combiner as you have mentioned below. Or there are other ways as
> well ?
The UDF must implement the Algebraic interface. See http://wiki.apache.org/pig/UDFManual
>
>
>>
>> Functions that can be decomposed to run over single values or
>> subsets of
>> values (that is, they are distributive or algebraic) such as SUM are
>> distributed by use of the combiner. So in the query you give
>> below, the
>> combiner would be invoked (possibly multiple times) between the map
>> and
>> reduce phases. SUM will get all of the values for a given group at
>> once,
>> but the upper bound for the size of this set is the fan in factor
>> for merge
>> (if there are too many map results, hadoop will do intermediate
>> merges and
>> invoke the combiner on each merge before passing those results to the
>> reducers).
>
> How do I set/configure the upper bound of this sieze which will be
> loaded in the memory ?
You don't. Pig monitors its memory usage and spills oversized sets to
disk as necessary. As monitoring memory usage in java is inexact, we
do not do a good job at this and sometimes run out of memory.
>
>
>> So, functions like SUM should not present a memory bottleneck.
>> Functions
>> that cannot be decomposed (like a click stream analysis function
>> that wants
>> to see a user's entire click stream at once) can present
>> bottlenecks. Also
>> joins can, since for an n way join we materialize n - 1 of the key
>> sets in
>> memory in order to do the cross product.
>>
>
> Please bear with me as I am relatively new to pig/haddop. Not able to
> understand as to why cogroup(i.e. join) can be a bottleneck. Lets take
> the following join example:
>
> table_a = LOAD 'a.csv' USING PigStorage(',') AS (user, usage);
> table_b = LOAD 'b.csv' USING PigStorage(',') AS (rate, username);
> joined_table = join table_a by user,table_b by username;
>
> Assuming
> A = (joe, 120),(joe,230),(john,100)
> B = (5, joe), (3,john)
>
> joe = (joe,120),(joe,230) (5,joe)
> john = (john,100) (3,john)
>
> In reduce phase we can do crossproduct between 2(can be n as well)
> different streams by keeping n-pointers and iterating over all teh
> values. Is there a need to load ALL the entries from all the tables
> into memory ?
Streams in hadoop are not rewindable. Pig only gets to see each value
from the stream once. So we cache all of the keys from n - 1 inputs
and stream through the last input.
Alan.
Re: memory requirement in reducephase
Posted by prasenjit mukherjee <pr...@gmail.com>.
Really appreciate your prompt reply. I do have some follow up
questions inline.
On Wed, Feb 11, 2009 at 10:16 PM, Alan Gates <ga...@yahoo-inc.com> wrote:
> In general, pig first collects all of the values for a given group into
> memory and then passes it to the UDF. This is done to support functions
> that need to see all of the values at once.
How does pig know whther a UDF function need to see all values at once
or it can be done iteratively ( like SUM ). Is the only option to use
a combiner as you have mentioned below. Or there are other ways as
well ?
>
> Functions that can be decomposed to run over single values or subsets of
> values (that is, they are distributive or algebraic) such as SUM are
> distributed by use of the combiner. So in the query you give below, the
> combiner would be invoked (possibly multiple times) between the map and
> reduce phases. SUM will get all of the values for a given group at once,
> but the upper bound for the size of this set is the fan in factor for merge
> (if there are too many map results, hadoop will do intermediate merges and
> invoke the combiner on each merge before passing those results to the
> reducers).
How do I set/configure the upper bound of this sieze which will be
loaded in the memory ?
> So, functions like SUM should not present a memory bottleneck. Functions
> that cannot be decomposed (like a click stream analysis function that wants
> to see a user's entire click stream at once) can present bottlenecks. Also
> joins can, since for an n way join we materialize n - 1 of the key sets in
> memory in order to do the cross product.
>
Please bear with me as I am relatively new to pig/haddop. Not able to
understand as to why cogroup(i.e. join) can be a bottleneck. Lets take
the following join example:
table_a = LOAD 'a.csv' USING PigStorage(',') AS (user, usage);
table_b = LOAD 'b.csv' USING PigStorage(',') AS (rate, username);
joined_table = join table_a by user,table_b by username;
Assuming
A = (joe, 120),(joe,230),(john,100)
B = (5, joe), (3,john)
joe = (joe,120),(joe,230) (5,joe)
john = (john,100) (3,john)
In reduce phase we can do crossproduct between 2(can be n as well)
different streams by keeping n-pointers and iterating over all teh
values. Is there a need to load ALL the entries from all the tables
into memory ?
Thanks in advance,
Prasen
> Alan.
>
> On Feb 10, 2009, at 9:06 PM, prasenjit mukherjee wrote:
>
>> I am trying to understand the memory bottleneck in pig's operators.
>> Are all the row-elements of a group-entry loaded in the memory while
>> processing foreach ... command ?
>>
>> For example in the following code :
>> raw = LOAD 'report.csv' USING PigStorage(',') AS (user, usage, time);
>> grp_by_user = GROUP raw BY (user);
>> usage_counts = FOREACH grp_by_user GENERATE group, SUM(raw.usage);
>>
>> In the last line does pig load all the entries of a user's usage in
>> the memory or it does the SUM iteratively without loading the entire
>> row ?
>>
>> Thanks,
>> Prasen
>
>
Re: memory requirement in reducephase
Posted by Alan Gates <ga...@yahoo-inc.com>.
In general, pig first collects all of the values for a given group
into memory and then passes it to the UDF. This is done to support
functions that need to see all of the values at once.
Functions that can be decomposed to run over single values or subsets
of values (that is, they are distributive or algebraic) such as SUM
are distributed by use of the combiner. So in the query you give
below, the combiner would be invoked (possibly multiple times) between
the map and reduce phases. SUM will get all of the values for a given
group at once, but the upper bound for the size of this set is the fan
in factor for merge (if there are too many map results, hadoop will do
intermediate merges and invoke the combiner on each merge before
passing those results to the reducers).
So, functions like SUM should not present a memory bottleneck.
Functions that cannot be decomposed (like a click stream analysis
function that wants to see a user's entire click stream at once) can
present bottlenecks. Also joins can, since for an n way join we
materialize n - 1 of the key sets in memory in order to do the cross
product.
Alan.
On Feb 10, 2009, at 9:06 PM, prasenjit mukherjee wrote:
> I am trying to understand the memory bottleneck in pig's operators.
> Are all the row-elements of a group-entry loaded in the memory while
> processing foreach ... command ?
>
> For example in the following code :
> raw = LOAD 'report.csv' USING PigStorage(',') AS (user, usage, time);
> grp_by_user = GROUP raw BY (user);
> usage_counts = FOREACH grp_by_user GENERATE group, SUM(raw.usage);
>
> In the last line does pig load all the entries of a user's usage in
> the memory or it does the SUM iteratively without loading the entire
> row ?
>
> Thanks,
> Prasen