You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Daniel Dai (JIRA)" <ji...@apache.org> on 2008/08/07 00:02:44 UTC

[jira] Updated: (PIG-350) Join optimization for pipeline rework

     [ https://issues.apache.org/jira/browse/PIG-350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Daniel Dai updated PIG-350:
---------------------------

    Attachment: join.patch

Here are some notes for this patch:
# The criteria for join optimization
   a. POPackage->POForEach is the root of reduce plan
   b. POUnion is the leaf of map plan (so that we exclude distinct, sort...)
   c. No combiner plan
   d. POForEach nested plan only contains POProject in any depth
# If the MR plan is not eligible for optimization, everything stay the same
# The first alias in cogroup statement is the one we are going to stream.
# In optimization case, we will change the output key format for map-reduce, originally it is (key, IndexedTuple), now it is ((old_key, index), IndexedTuple)
# Here is the layout for patitioner, comparator for new key
   #* Partition key: (old_key, index)
   #* OutputKeyComparator: by old_key + reverse index
   #* OutputValueGroupingComparator: by old_key
   With this layout, we get
   ## If two inputs share the same old_key, they go to the same reducer
   ## For every old_key, reduce function is called once
   ## Values iterator are sorted on reverse index order, so that we can materialize n-1 inputs, then stream the first input
# Implementation detail for POJoinPackage:
   a. For every tuple in the first input, combine it with materialized n-1 inputs, feed to delegated ForEach operator
   b. Call getNext on ForEach until nothing available, then feed the next tuple
   c. Maintain internal status, since we feed POJoinPackage once, and then call getNext multiple times
# For performance reason, I use a byte level comparator for OutputKeyComparator, there is no byte level comparator for partitioner and OutputValueGroupingComparator in hadoop probably because OutputKeyComparator is more critical than others. While OutputValueGroupingComparator and partitioner is called one tuple a time, OutputKeyComparator is called more frequently


Possible improvement:
# User is able to give a hint for which alias to use for stream, we can change OutputKeyComparator to implement it easily

> Join optimization for pipeline rework
> -------------------------------------
>
>                 Key: PIG-350
>                 URL: https://issues.apache.org/jira/browse/PIG-350
>             Project: Pig
>          Issue Type: Bug
>          Components: impl
>    Affects Versions: types_branch
>            Reporter: Alan Gates
>            Assignee: Daniel Dai
>            Priority: Critical
>             Fix For: types_branch
>
>         Attachments: join.patch
>
>
> Currently, joins in pig are done as groupings where each input is grouped on the join key.  In the reduce phase, records from each input are collected into a bag for each key, and then a cross product done on these bags.  This can be optimized by selecting one (hopefully the largest) input and streaming through it rather than placing the results in a bag.  This will result in better memory usage, less spills to disk due to bag overflow, and better performance.  Ideally, the system would intelligently select which input to stream, based on a histogram of value distributions for the keys.  Pig does not have that kind of metadata.  So for now it is best to always pick the same input (first or last) so that the user can select which input to stream.
> Similarly, order by in pig is done in this same way, with the grouping keys being the ordering keys, and only one input.  In this case pig still currently collects all the records for a key into a bag, and then flattens the bag.  This is a total waste, and in some cases causes significant performance degradation.  The same optimization listed above can address this case, where the last bag (in this case the only bag) is streamed rather than collected.
> To do these operations, a new POJoinPackage will be needed.  It will replace POPackage and the following POForEach in these types of scripts, handling pulling the records from hadoop and streaming them into the pig pipeline.  A visitor will need to be added in the map reduce compilation phase that detects this case and combines the POPackage with POForeach into this new POJoinPackage.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.