You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Santhosh Srinivasan (JIRA)" <ji...@apache.org> on 2009/06/01 18:29:07 UTC

[jira] Commented: (PIG-697) Proposed improvements to pig's optimizer

    [ https://issues.apache.org/jira/browse/PIG-697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12715143#action_12715143 ] 

Santhosh Srinivasan commented on PIG-697:
-----------------------------------------

The graph operation pushAfter was added as a complementary operation to pushBefore. Currently, on the logical side, there are no concrete use cases for pushAfter. The only operator that truly supports multiple outputs is split. Our current model for split is to have an no-op split operator that has multiple successors, split outputs, each of which is the equivalent of a filter. The split output has inner plans which could have projection operators that hold references to the split's predecessor. 

When an operator is pushed after split, the operator will be placed between the split and split output. As a result, when rewire on split is called, the call is dispatched to the split output. The references in the split output after the rewire will now point to split's predecessor instead of pointing to the operator that was pushed after.

The intention of the pushAfter in the case of a split is to push it after the split output. However, the generic pushAfter operation does not distinguish between split and split output. A possible way out is to override this method in the logical plan and duplicate most of the code in the OperatorPlan and add new code to handle split.

As of now, the pushAfter will not be used in the logical layer.


> Proposed improvements to pig's optimizer
> ----------------------------------------
>
>                 Key: PIG-697
>                 URL: https://issues.apache.org/jira/browse/PIG-697
>             Project: Pig
>          Issue Type: Bug
>          Components: impl
>            Reporter: Alan Gates
>            Assignee: Santhosh Srinivasan
>         Attachments: OptimizerPhase1.patch, OptimizerPhase1_part2.patch, OptimizerPhase2.patch, OptimizerPhase3_parrt1-1.patch, OptimizerPhase3_parrt1.patch
>
>
> I propose the following changes to pig optimizer, plan, and operator functionality to support more robust optimization:
> 1) Remove the required array from Rule.  This will change rules so that they only match exact patterns instead of allowing missing elements in the pattern.
> This has the downside that if a given rule applies to two patterns (say Load->Filter->Group, Load->Group) you have to write two rules.  But it has the upside that
> the resulting rules know exactly what they are getting.  The original intent of this was to reduce the number of rules that needed to be written.  But the
> resulting rules have do a lot of work to understand the operators they are working with.  With exact matches only, each rule will know exactly the operators it
> is working on and can apply the logic of shifting the operators around.  All four of the existing rules set all entries of required to true, so removing this
> will have no effect on them.
> 2) Change PlanOptimizer.optimize to iterate over the rules until there are no conversions or a certain number of iterations has been reached.  Currently the
> function is:
> {code}
>     public final void optimize() throws OptimizerException {
>         RuleMatcher matcher = new RuleMatcher();
>         for (Rule rule : mRules) {
>             if (matcher.match(rule)) {
>                 // It matches the pattern.  Now check if the transformer
>                 // approves as well.
>                 List<List<O>> matches = matcher.getAllMatches();
>                 for (List<O> match:matches)
>                 {
> 	                if (rule.transformer.check(match)) {
> 	                    // The transformer approves.
> 	                    rule.transformer.transform(match);
> 	                }
>                 }
>             }
>         }
>     }
> {code}
> It would change to be:
> {code}
>     public final void optimize() throws OptimizerException {
>         RuleMatcher matcher = new RuleMatcher();
>         boolean sawMatch;
>         int iterators = 0;
>         do {
>             sawMatch = false;
>             for (Rule rule : mRules) {
>                 List<List<O>> matches = matcher.getAllMatches();
>                 for (List<O> match:matches) {
>                     // It matches the pattern.  Now check if the transformer
>                     // approves as well.
>                     if (rule.transformer.check(match)) {
>                         // The transformer approves.
>                         sawMatch = true;
>                         rule.transformer.transform(match);
>                     }
>                 }
>             }
>             // Not sure if 1000 is the right number of iterations, maybe it
>             // should be configurable so that large scripts don't stop too 
>             // early.
>         } while (sawMatch && numIterations++ < 1000);
>     }
> {code}
> The reason for limiting the number of iterations is to avoid infinite loops.  The reason for iterating over the rules is so that each rule can be applied multiple
> times as necessary.  This allows us to write simple rules, mostly swaps between neighboring operators, without worrying that we get the plan right in one pass.
> For example, we might have a plan that looks like:  Load->Join->Filter->Foreach, and we want to optimize it to Load->Foreach->Filter->Join.  With two simple
> rules (swap filter and join and swap foreach and filter), applied iteratively, we can get from the initial to final plan, without needing to understanding the
> big picture of the entire plan.
> 3) Add three calls to OperatorPlan:
> {code}
> /**
>  * Swap two operators in a plan.  Both of the operators must have single
>  * inputs and single outputs.
>  * @param first operator
>  * @param second operator
>  * @throws PlanException if either operator is not single input and output.
>  */
> public void swap(E first, E second) throws PlanException {
>     ...
> }
> /**
>  * Push one operator in front of another.  This function is for use when
>  * the first operator has multiple inputs.  The caller can specify
>  * which input of the first operator the second operator should be pushed to.
>  * @param first operator, assumed to have multiple inputs.
>  * @param second operator, will be pushed in front of first
>  * @param inputNum, indicates which input of the first operator the second
>  * operator will be pushed onto.  Numbered from 0.
>  * @throws PlanException if inputNum does not exist for first operator
>  */
> public void pushBefore(E first, E second, int inputNum) throws PlanException {
>     ...
> }
> /**
>  * Push one operator after another.  This function is for use when the second
>  * operator has multiple outputs.  The caller can specify which output of the
>  * second operator the first operator should be pushed to.
>  * @param first operator, will be pushed after the second operator
>  * @param second operator, assumed to have multiple outputs
>  * @param outputNum indicates which output of the second operator the first 
>  * operator will be pushed onto.  Numbered from 0.
>  * @throws PlanException if outputNum does not exist for second operator
>  */
> public void pushAfter(E first, E second, int outputNum) throws PlanException {
>     ...
> }
> {code}
> The rules in the optimizer can use these three functions, along with the existing insertBetween(), replace(), and removeAndReconnect() calls to operate on the
> plan.
> 4) Add a new call to Operator:
> {code}
> /**
>  * Make any necessary changes to a node based on a change of position in the
>  * plan.  This allows operators to rewire their projections, etc. when they
>  * are relocated in a plan.
>  * @param oldPred Operator that was previously the predecessor.
>  * @param newPred Operator thwas will now be the predecessor.
>  * @throws PlanException
>  */
> public abstract void rewire(Operator oldPred, Operator newPred) throws PlanException;
> {code}
> This method will be called by the swap, pushBefore, pushAfter, insertBetween, replace, and removeAndReconnect in OperatorPlan whenever an operator is moved
> around so that the operator has a chance to make any necessary changes.  
> 5) Add new calls to LogicalOperator and PhysicalOperator
> {code}
> /**
>  * A struct detailing how a projection is altered by an operator.
>  */
> public class ProjectionMap {
>     /**
>      * Quick way for an operator to note that its input and output are the
>      * same.
>      */
>     public boolean noChange;
>     /**
>      * Map of field changes, with keys being the output fields of the 
>      * operator and values being the input fields.  Fields are numbered from
>      * 0.  So for a foreach operator derived from
>      * 'B = foreach A generate $0, $2, $3, udf($1)' 
>      * would produce a mapping of 0->0, 1->2, 2->3
>      */
>     public Map<Integer, Integer> mappedFields;
>     /**
>      * List of fields removed from the input.  This includes fields that were
>      * transformed, and thus are no longer the same fields.  Using the
>      * example foreach given under mappedFields, this list would contain '1'.
>      */
>     public List<Integer> removedFields;
>     /**
>      * List of fields in the output of this operator that were created by this
>      * operator.  Using the example foreach given under mappedFields, this list
>      * would contain '3'.
>      */
>     public List<Integer> addedFields;
> }
> /**
>  * Produce a map describing how this operator modifies its projection.
>  * @returns ProjectionMap null indicates it does not know how the projection
>  * changes, for example a join of two inputs where one input does not have
>  * a schema.
>  */
> public abstract ProjectionMap getProjectionMap();
> /**
>  * Get a list of fields that this operator requires.  This is not necessarily
>  * equivalent to the list of fields the operator projects.  For example,
>  * a filter will project anything passed to it, but requires only the fields
>  * explicitly referenced in its filter expression.
>  * @return list of fields, numbered from 0.
>  */
> public abstract List<Integer> getRequiredFields();
> {code}
> These calls will be called by optimizer rules to determine whether or not a swap can be done (for example, you can't swap two operators if the second one uses a
> field added by the first), and once the swap is done they will be used by rewire to understand how to map projections in the operators.
> 6)  It's not clear that the RuleMatcher, in its current form, will work with rules that are not linear.  That is, it matches rules that look like:
> Operators {Foreach, Filter}
> Edges {0->1}
> But I don't know if it will match rules that look like:
> Operators {Scan, Scan, Join}
> Edges {0->2, 1->2}
> For the optimizer to be able to determine join types and operations with splits, it will have to be able to do that.
> Examples of types of rules that is optimizer could support:
> 1) Pushing filters in front of joins.
> 2) Pushing foreachs with flattens (which thus greathly expand the data) down the tree past filters, joins, etc.
> 3) Pushing type casting used for schemas in loads down to the point where the field is actually used.
> 4) Deciding when to do fragment/replicate join or sort/merge join instead of the standard hash join.
> 5) The current optimizations:  pushing limit up the tree, making implicit splits explicit, merge load and stream where possible, using the combiner.
> 6) Merge filters or foreachs where possible
> In particular the combiner optimizer hopefully can be completely rewritten to use the optimizer framework to make decisions about how to rework physical plans
> to push work into the combiner.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.