You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by Alan Gates <ga...@yahoo-inc.com> on 2008/08/01 17:12:12 UTC

Re: [jira] Created: (PIG-350) Join optimization for pipeline rework

In the current version, the combiner is not used with cogroup.  With the 
pipeline rework going on in the types branch, the combiner will be used 
for cogroups like:

C = cogroup A, B;
D = foreach C generate project, algebraic, algebraic, ...

where project is a non-UDF expression that projects fields from C and 
algebraic represents an algebraic UDF on one of the fields of C.  
Projections that are flattened will not be combined, because all the 
records are necessary to properly materialize the cross product.  So 
that means the optimization proposed in pig 350 won't interact with the 
combiner.

As far as cross and the combiner, we don't yet have a combiner algorithm 
for optimizing cross.  This is doable but complicated.  Are you 
currently using cross?  We had not focussed on this as an optimization 
area because we were not aware of people who used it.

You mention using the combiner with filters.  Were you wanting us to 
catch cases like:

B = group A;
C = filter B by $0 > 5;
D = foreach C generate group, COUNT(A);

and push both the filter and the foreach into the combiner?  That is 
possible, but we have put that off in favor of instead pushing the 
filter above the group.  (We don't do this filter pushing yet, but work 
is on going to develop an optimizer that will do these kinds of 
optimizations.)  The only case we could think of where you wouldn't want 
to push the filter (and we won't) is when the filter involves a udf 
which might be very expensive to call so you want to wait until after 
the data is grouped to minimize the number of calls to the UDF.

Alan.

Mridul Muralidharan wrote:
>
> This would be absolutely great !
> Btw, hope this continues to work fine with combiners in case of 
> COGROUP + FILTER (combiners are applicable in this case right ? or 
> only for group ?).
>
> Additionally, what would the impact of this be on CROSS + FILTER ? (I 
> am assuming that CROSS + FILTER is not combinable currently)
>
>
> Thanks,
> Mridul
>
> Alan Gates (JIRA) wrote:
>> Join optimization for pipeline rework
>> -------------------------------------
>>
>>                  Key: PIG-350
>>                  URL: https://issues.apache.org/jira/browse/PIG-350
>>              Project: Pig
>>           Issue Type: Bug
>>           Components: impl
>>     Affects Versions: types_branch
>>             Reporter: Alan Gates
>>             Assignee: Daniel Dai
>>             Priority: Critical
>>              Fix For: types_branch
>>
>>
>> Currently, joins in pig are done as groupings where each input is 
>> grouped on the join key.  In the reduce phase, records from each 
>> input are collected into a bag for each key, and then a cross product 
>> done on these bags.  This can be optimized by selecting one 
>> (hopefully the largest) input and streaming through it rather than 
>> placing the results in a bag.  This will result in better memory 
>> usage, less spills to disk due to bag overflow, and better 
>> performance.  Ideally, the system would intelligently select which 
>> input to stream, based on a histogram of value distributions for the 
>> keys.  Pig does not have that kind of metadata.  So for now it is 
>> best to always pick the same input (first or last) so that the user 
>> can select which input to stream.
>>
>> Similarly, order by in pig is done in this same way, with the 
>> grouping keys being the ordering keys, and only one input.  In this 
>> case pig still currently collects all the records for a key into a 
>> bag, and then flattens the bag.  This is a total waste, and in some 
>> cases causes significant performance degradation.  The same 
>> optimization listed above can address this case, where the last bag 
>> (in this case the only bag) is streamed rather than collected.
>>
>> To do these operations, a new POJoinPackage will be needed.  It will 
>> replace POPackage and the following POForEach in these types of 
>> scripts, handling pulling the records from hadoop and streaming them 
>> into the pig pipeline.  A visitor will need to be added in the map 
>> reduce compilation phase that detects this case and combines the 
>> POPackage with POForeach into this new POJoinPackage.
>>
>

Re: [jira] Created: (PIG-350) Join optimization for pipeline rework

Posted by Mridul Muralidharan <mr...@yahoo-inc.com>.
Hi Alan,

   Please see inline.

Regards,
Mridul

Alan Gates wrote:
> In the current version, the combiner is not used with cogroup.  With the 
> pipeline rework going on in the types branch, the combiner will be used 
> for cogroups like:
> 
> C = cogroup A, B;
> D = foreach C generate project, algebraic, algebraic, ...
> 
> where project is a non-UDF expression that projects fields from C and 
> algebraic represents an algebraic UDF on one of the fields of C.  
> Projections that are flattened will not be combined, because all the 
> records are necessary to properly materialize the cross product.  So 
> that means the optimization proposed in pig 350 won't interact with the 
> combiner.


This is great to hear, I should have followed this a bit more carefully.
Will need to consider your comment above a bit more later on, thanks !

> 
> As far as cross and the combiner, we don't yet have a combiner algorithm 
> for optimizing cross.  This is doable but complicated.  Are you 
> currently using cross?  We had not focussed on this as an optimization 
> area because we were not aware of people who used it.


One primary usecase which comes to mind for cross is something like this :
Inspite of the obvious cost, we are having to use CROSS for applying 
join constraints across 'tables' (sql like psox joins : not normal pig 
table joins). So we end up with something like this currently :


entity1 = apply entity1 constraints from relevant table(s).
entity2 = apply entity2 constraints from relevant table(s).
entity3 = apply entity3 constraints from relevant table(s).

--- for inter entity constraints :
cross_res = CROSS entity1, entity2, entity3, ...;
constrained_op = FOREACH cross_res {
   --- apply interentity constraints.
   GENERATE valid, entity1, entity2, entity3;
};
constrained_op = FILTER constrained_op BY valid == '1'

This could be split into a series of cogroup's and filters (imo) but I 
have not researched too much into that (in interest of time, nothing else).

But as should be obvious here, the cost of cross is extremely high (even 
though the entity tables are constrained already - the cross can 'blow 
up') ... and most (very high percentage) of the output is going to be 
discarded.
Which is why, if there is any form of combiner which is applicable, or 
any optimization which is possible here, it would be tremendously 
benificial for us - or even any form of optimization on cross for that 
matter.

> 
> You mention using the combiner with filters.  Were you wanting us to 
> catch cases like:
> 
> B = group A;
> C = filter B by $0 > 5;
> D = foreach C generate group, COUNT(A);
> 
> and push both the filter and the foreach into the combiner?  That is 
> possible, but we have put that off in favor of instead pushing the 
> filter above the group.  (We don't do this filter pushing yet, but work 
> is on going to develop an optimizer that will do these kinds of 
> optimizations.)  The only case we could think of where you wouldn't want 

My current assumption was that group followed by foreach is combinable.
So my though is to use foreach also as filter - to generate a validity 
flag which could be filtered later, and generate data only if this is 
valid (other wise, it will be empty - thereby saving on data size).
But if both filter or both filter + foreach or combinations thereof 
could be pushed as combiner, that would be absolute great.
Did not bug too much about it since we have not yet got to the stage 
where these things are a concern (I am still in early prototype when I 
get time).



Thanks,
Mridul

> to push the filter (and we won't) is when the filter involves a udf 
> which might be very expensive to call so you want to wait until after 
> the data is grouped to minimize the number of calls to the UDF.
> 
> Alan.
> 
> Mridul Muralidharan wrote:
>>
>> This would be absolutely great !
>> Btw, hope this continues to work fine with combiners in case of 
>> COGROUP + FILTER (combiners are applicable in this case right ? or 
>> only for group ?).
>>
>> Additionally, what would the impact of this be on CROSS + FILTER ? (I 
>> am assuming that CROSS + FILTER is not combinable currently)
>>
>>
>> Thanks,
>> Mridul
>>
>> Alan Gates (JIRA) wrote:
>>> Join optimization for pipeline rework
>>> -------------------------------------
>>>
>>>                  Key: PIG-350
>>>                  URL: https://issues.apache.org/jira/browse/PIG-350
>>>              Project: Pig
>>>           Issue Type: Bug
>>>           Components: impl
>>>     Affects Versions: types_branch
>>>             Reporter: Alan Gates
>>>             Assignee: Daniel Dai
>>>             Priority: Critical
>>>              Fix For: types_branch
>>>
>>>
>>> Currently, joins in pig are done as groupings where each input is 
>>> grouped on the join key.  In the reduce phase, records from each 
>>> input are collected into a bag for each key, and then a cross product 
>>> done on these bags.  This can be optimized by selecting one 
>>> (hopefully the largest) input and streaming through it rather than 
>>> placing the results in a bag.  This will result in better memory 
>>> usage, less spills to disk due to bag overflow, and better 
>>> performance.  Ideally, the system would intelligently select which 
>>> input to stream, based on a histogram of value distributions for the 
>>> keys.  Pig does not have that kind of metadata.  So for now it is 
>>> best to always pick the same input (first or last) so that the user 
>>> can select which input to stream.
>>>
>>> Similarly, order by in pig is done in this same way, with the 
>>> grouping keys being the ordering keys, and only one input.  In this 
>>> case pig still currently collects all the records for a key into a 
>>> bag, and then flattens the bag.  This is a total waste, and in some 
>>> cases causes significant performance degradation.  The same 
>>> optimization listed above can address this case, where the last bag 
>>> (in this case the only bag) is streamed rather than collected.
>>>
>>> To do these operations, a new POJoinPackage will be needed.  It will 
>>> replace POPackage and the following POForEach in these types of 
>>> scripts, handling pulling the records from hadoop and streaming them 
>>> into the pig pipeline.  A visitor will need to be added in the map 
>>> reduce compilation phase that detects this case and combines the 
>>> POPackage with POForeach into this new POJoinPackage.
>>>
>>