You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/12/19 04:58:20 UTC

svn commit: r1050757 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ test/org/apache/pig/test/

Author: thejas
Date: Sun Dec 19 03:58:20 2010
New Revision: 1050757

URL: http://svn.apache.org/viewvc?rev=1050757&view=rev
Log:
PIG-750: Use combiner when algebraic UDFs are used in expressions 
PIG-490: Combiner not used when group elements referred to in 
  tuple notation instead of flatten. 


Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    pig/trunk/test/org/apache/pig/test/TestCombiner.java
    pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1050757&r1=1050756&r2=1050757&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Dec 19 03:58:20 2010
@@ -24,6 +24,11 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-750: Use combiner when algebraic UDFs are used in expressions (thejas)
+
+PIG-490: Combiner not used when group elements referred to in 
+  tuple notation instead of flatten. (thejas)
+
 PIG-1768: 09 docs: illustrate (changec via olgan)
 
 PIG-1768: docs reorg (changec via olgan)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=1050757&r1=1050756&r2=1050757&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Sun Dec 19 03:58:20 2010
@@ -18,7 +18,9 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,6 +33,7 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
@@ -54,46 +57,29 @@ import org.apache.pig.impl.plan.PlanWalk
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.impl.util.Pair;
 
 /**
  * Optimize map reduce plans to use the combiner where possible.
- * Currently Foreach is copied to the combiner phase if it does not contain a
- * nested plan and all UDFs in the generate statement are algebraic.
- * The version of the foreach in the combiner
- * stage will use the initial function, and the version in the reduce stage
- * will be changed to use the final function.
- *
+ * Algebriac functions and distinct in nested plan of a foreach are partially 
+ * computed in the map and combine phase.
+ * A new foreach statement with initial and intermediate forms of algebraic
+ * functions are added to map and combine plans respectively. 
+ * 
+ * If bag portion of group-by result is projected or a non algebraic 
+ * expression/udf has bag as input, combiner will not be used. This is because 
+ * the use of combiner in such case is likely to degrade performance 
+ * as there will not be much reduction in data size in combine stage to 
+ * offset the cost of the additional number of times (de)serialization is done.
+ * 
+ * 
  * Major areas for enhancement:
- * 1) Currently, scripts such as:
- *     B = group A by $0;
- *     C = foreach B {
- *         C1 = distinct A;
- *         generate group, COUNT(C1);
- *     }
- * do not use the combiner.  The issue is being able to properly decompose
- * the expression in the UDF's plan.  The current code just takes whatever is
- * the argument to the algebraic UDF and replaces it with a project.  This
- * works for things like generate group, SUM(A.$1 + 1).  But it fails for
- * things like the above.  Certain types of inner plans will never be
- * movable (like filters).  But distinct or order by in the inner plan
- * should be mobile.  And, things like:
- *      C = cogroup A by $0, B by $0;
- *      D = foreach C {
- *          D1 = distinct A;
- *          D2 = distinct B;
- *          generate UDF(D1 + D2);
- *      }
- * make it even harder.  The first step is probably just to handle queries
- * like the first above, as they will probably be the most common.
+ * 1. use of combiner in cogroup
+ * 2. queries with order-by, limit or sort in a nested foreach after group-by
+ * 3. case where group-by is followed by filter that has algebraic expression
  *
- * 2) Scripts such as:
- *     B = group A by $0;
- *     C = foreach B generate algebraic(A), nonalgebraic(A);
- * currently aren't moved into the combiner, even though they could be.
- * Again, the trick here is properly decomposing the plan since A may be more
- * than a simply projection.
+ * 
  *
- * #2 should probably be the next area of focus.
  *
  */
 public class CombinerOptimizer extends MROpPlanVisitor {
@@ -102,18 +88,7 @@ public class CombinerOptimizer extends M
 
     private Log log = LogFactory.getLog(getClass());
 
-    private enum ExprType { SIMPLE_PROJECT, ALGEBRAIC, NOT_ALGEBRAIC,
-        DISTINCT };
-
-    private int mKeyField = -1;
-    
-    // This array tracks the positions of the group key in the output tuples 
-    // of the foreach clause. This needs to be revisited when combiner optimizer
-    // supports foreach output with parts of group key (e.g. group.$0).
-    private boolean[] keyFieldPositions;
 
-    private byte mKeyType = 0;
-    
     private CompilationMessageCollector messageCollector = null;
 
     public CombinerOptimizer(MROperPlan plan, String chunkSize) {
@@ -122,21 +97,21 @@ public class CombinerOptimizer extends M
     }
 
     public CombinerOptimizer(MROperPlan plan, String chunkSize, 
-    		CompilationMessageCollector messageCollector) {
+            CompilationMessageCollector messageCollector) {
         super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
         this.messageCollector = messageCollector ; 
     }
-    
+
     public CompilationMessageCollector getMessageCollector() {
-    	return messageCollector;
+        return messageCollector;
     }
 
     @Override
     public void visitMROp(MapReduceOper mr) throws VisitorException {
-        resetState();
         log.trace("Entering CombinerOptimizer.visitMROp");
         if (mr.reducePlan.isEmpty()) return;
 
+        // part one - check if this MR job represents a group-by + foreach
         // Find the POLocalRearrange in the map.  I'll need it later.
         List<PhysicalOperator> mapLeaves = mr.mapPlan.getLeaves();
         if (mapLeaves == null || mapLeaves.size() != 1) {
@@ -151,7 +126,7 @@ public class CombinerOptimizer extends M
 
         List<PhysicalOperator> reduceRoots = mr.reducePlan.getRoots();
         if (reduceRoots.size() != 1) {
-        	messageCollector.collect("Expected reduce to have single leaf", MessageType.Warning, PigWarning.MULTI_LEAF_REDUCE);
+            messageCollector.collect("Expected reduce to have single leaf", MessageType.Warning, PigWarning.MULTI_LEAF_REDUCE);
             return;
         }
 
@@ -159,7 +134,7 @@ public class CombinerOptimizer extends M
         // not, I don't know what's going on, so I'm out of here.
         PhysicalOperator root = reduceRoots.get(0);
         if (!(root instanceof POPackage)) {
-        	messageCollector.collect("Expected reduce root to be a POPackage", MessageType.Warning, PigWarning.NON_PACKAGE_REDUCE_PLAN_ROOT);
+            messageCollector.collect("Expected reduce root to be a POPackage", MessageType.Warning, PigWarning.NON_PACKAGE_REDUCE_PLAN_ROOT);
             return;
         }
         POPackage pack = (POPackage)root;
@@ -169,7 +144,6 @@ public class CombinerOptimizer extends M
         if (packSuccessors == null || packSuccessors.size() != 1) return;
         PhysicalOperator successor = packSuccessors.get(0);
 
-        // Need to check if this is a distinct.
         if (successor instanceof POFilter) {
             /*
                Later
@@ -184,128 +158,384 @@ public class CombinerOptimizer extends M
             } else if (algebraic(filterInner)) {
                 // TODO Duplicate filter to combiner
             }
-            */
+             */
         } else if (successor instanceof POForEach) {
             POForEach foreach = (POForEach)successor;
             List<PhysicalPlan> feInners = foreach.getInputPlans();
-            List<ExprType> ap = algebraic(feInners, foreach.getToBeFlattened());
-            if (ap != null) {
-                log.info("Choosing to move algebraic foreach to combiner");
-
-                // Need to insert two new foreachs - one  in the combine
-				// and one in the map plan which will be based on the reduce foreach.
-				// The map foreach will have one inner plan for each 
-				// inner plan in the foreach we're duplicating.  For 
-				// projections, the plan will be the same.  For algebraic 
-				// udfs, the plan will have the initial version of the function.
-				
-				// The combine foreach will have one inner plan for each 
-				// inner plan in the foreach we're duplicating.  For 
-				// projections, the project operators will be changed to
-				// project the same column as its position in the
-				// foreach. For algebraic udfs, the plan will have the 
-				// intermediate version of the function. The input to the
-				// udf will be a POProject which will project the column
-				// corresponding to the position of the udf in the foreach
-				
-			    // In the inner plans of the reduce foreach for 	
-				// projections, the project operators will be changed to
-				// project the same column as its position in the
-				// foreach. For algebraic udfs, the plan will have the 
-				// final version of the function. The input to the
-				// udf will be a POProject which will project the column
-				// corresponding to the position of the udf in the foreach
-                if (mr.combinePlan.getRoots().size() != 0) {
-                	messageCollector.collect("Wasn't expecting to find anything already "
+            
+            // find algebraic operators and also check if the foreach statement
+            // is suitable for combiner use
+            List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = 
+                findAlgebraicOps(feInners);
+            if(algebraicOps == null || algebraicOps.size() == 0){
+                // the plan is not  combinable or there is nothing to combine
+                //we're done
+                return;
+            }
+            if (mr.combinePlan.getRoots().size() != 0) {
+                messageCollector.collect("Wasn't expecting to find anything already "
                         + "in the combiner!", MessageType.Warning, PigWarning.NON_EMPTY_COMBINE_PLAN);
-                    return;
+                return;
+            }
+
+            log.info("Choosing to move algebraic foreach to combiner");
+
+            try {
+
+
+                // replace PODistinct->Project[*] with distinct udf (which is Algebriac)
+                for(Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps ){
+                    if(! (op2plan.first instanceof PODistinct))
+                        continue;
+                    DistinctPatcher distinctPatcher = new DistinctPatcher(op2plan.second);
+                    distinctPatcher.visit();
+                    if(distinctPatcher.getDistinct() == null){
+                        int errCode = 2073;
+                        String msg = "Problem with replacing distinct operator with distinct built-in function.";
+                        throw new PlanException(msg, errCode, PigException.BUG);
+                    }
+                    op2plan.first = distinctPatcher.getDistinct();
                 }
-                mr.combinePlan = new PhysicalPlan();
-                try {
-                    // If we haven't already found the key (and thus the
-                    // key type) we need to figure out the key type now.
-                    if (mKeyType == 0) {
-                        mKeyType = rearrange.getKeyType();
+
+                //create new map foreach
+                POForEach mfe = createForEachWithGrpProj(foreach, rearrange.getKeyType());                
+                Map<PhysicalOperator, Integer> op2newpos = 
+                    new HashMap<PhysicalOperator, Integer>();
+                Integer pos = 1;
+                //create plan for each algebraic udf and add as inner plan in map-foreach 
+                for(Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps ){
+                    PhysicalPlan udfPlan = createPlanWithPredecessors(op2plan.first, op2plan.second);
+                    mfe.addInputPlan(udfPlan, false);
+                    op2newpos.put(op2plan.first, pos++);
+                }
+                changeFunc(mfe, POUserFunc.INITIAL);
+
+                // since we will only be creating SingleTupleBag as input to
+                // the map foreach, we should flag the POProjects in the map
+                // foreach inner plans to also use SingleTupleBag
+                for (PhysicalPlan mpl : mfe.getInputPlans()) {
+                    try {
+                        new fixMapProjects(mpl).visit();
+                    } catch (VisitorException e) {
+                        int errCode = 2089;
+                        String msg = "Unable to flag project operator to use single tuple bag.";
+                        throw new PlanException(msg, errCode, PigException.BUG, e);
                     }
+                }
+
+                //create new combine foreach
+                POForEach cfe = createForEachWithGrpProj(foreach, rearrange.getKeyType());
+                //add algebraic functions with appropriate projection
+                addAlgebraicFuncToCombineFE(cfe, op2newpos);
+                changeFunc(cfe, POUserFunc.INTERMEDIATE);
+
+                //fix projection and function time for algebraic functions in reduce foreach
+                for(Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps ){
+                    setProjectInput(op2plan.first, op2plan.second, op2newpos.get(op2plan.first));
+                    ((POUserFunc)op2plan.first).setAlgebraicFunction(POUserFunc.FINAL);
+                }
+
+
+                // we have modified the foreach inner plans - so set them
+                // again for the foreach so that foreach can do any re-initialization
+                // around them.
+                // FIXME - this is a necessary evil right now because the leaves are explicitly
+                // stored in the POForeach as a list rather than computed each time at 
+                // run time from the plans for optimization. Do we want to have the Foreach
+                // compute the leaves each time and have Java optimize it (will Java optimize?)?
+                mfe.setInputPlans(mfe.getInputPlans());
+                cfe.setInputPlans(cfe.getInputPlans());
+                foreach.setInputPlans(foreach.getInputPlans());
+
+                //tell POCombinerPackage which fields need projected and
+                // which placed in bags. First field is simple project
+                // rest need to go into bags
+                int numFields = algebraicOps.size() + 1; // algebraic funcs + group key
+                boolean[] bags = new boolean[numFields];
+                bags[0] = false;
+                for (int i = 1; i < numFields; i++) {
+                    bags[i] = true;
+                }
+
+                // Use the POCombiner package in the combine plan
+                // as it needs to act differently than the regular
+                // package operator.
+                mr.combinePlan = new PhysicalPlan();
+                POCombinerPackage combinePack =
+                    new POCombinerPackage(pack, bags);
+                mr.combinePlan.add(combinePack);
+                mr.combinePlan.add(cfe);
+                mr.combinePlan.connect(combinePack, cfe);
+                // No need to connect projections in cfe to cp, because
+                // PigCombiner directly attaches output from package to
+                // root of remaining plan.
+
+                POLocalRearrange mlr = getNewRearrange(rearrange);
+
+
+                // A specialized local rearrange operator will replace
+                // the normal local rearrange in the map plan. This behaves
+                // like the regular local rearrange in the getNext() 
+                // as far as getting its input and constructing the 
+                // "key" out of the input. It then returns a tuple with
+                // two fields - the key in the first position and the
+                // "value" inside a bag in the second position. This output
+                // format resembles the format out of a Package. This output
+                // will feed to the map foreach which expects this format.
+                // If the key field isn't in the project of the combiner or map foreach,
+                // it is added to the end (This is required so that we can 
+                // set up the inner plan of the new Local Rearrange leaf in the map
+                // and combine plan to contain just the project of the key).
+                patchUpMap(mr.mapPlan, getPreCombinerLR(rearrange), mfe, mlr);
+                POLocalRearrange clr = getNewRearrange(rearrange);
+
+                mr.combinePlan.add(clr);
+                mr.combinePlan.connect(cfe, clr);
+
+                // Change the package operator in the reduce plan to
+                // be the POCombiner package, as it needs to act
+                // differently than the regular package operator.
+                POCombinerPackage newReducePack =
+                    new POCombinerPackage(pack, bags);
+                mr.reducePlan.replace(pack, newReducePack);
+
+                // the replace() above only changes
+                // the plan and does not change "inputs" to 
+                // operators
+                // set up "inputs" for the operator after
+                // package correctly
+                List<PhysicalOperator> packList = new ArrayList<PhysicalOperator>();
+                packList.add(newReducePack);
+                List<PhysicalOperator> sucs = mr.reducePlan.getSuccessors(newReducePack);
+                // there should be only one successor to package
+                sucs.get(0).setInputs(packList);
+            } catch (Exception e) {
+                int errCode = 2018;
+                String msg = "Internal error. Unable to introduce the combiner for optimization.";
+                throw new OptimizerException(msg, errCode, PigException.BUG, e);
+            }
+        }
+    }
+
+
+    /**
+     * find algebraic operators and also check if the foreach statement
+     *  is suitable for combiner use
+     * @param feInners inner plans of foreach
+     * @return null if plan is not combinable, otherwise list of combinable operators
+     * @throws VisitorException
+     */
+    private List<Pair<PhysicalOperator, PhysicalPlan>> 
+    findAlgebraicOps(List<PhysicalPlan> feInners)
+    throws VisitorException {
+        ArrayList<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = new ArrayList<Pair<PhysicalOperator, PhysicalPlan>>();
+
+        //check each foreach inner plan
+        for(PhysicalPlan pplan : feInners){
+            //check for presence of non combinable operators
+            AlgebraicPlanChecker algChecker = new AlgebraicPlanChecker(pplan);
+            algChecker.visit();
+            if(algChecker.sawNonAlgebraic){
+                return null;
+            }
+
+            //if we found a combinable distinct add that to list
+            if(algChecker.sawDistinctAgg){
+                algebraicOps.add(new Pair<PhysicalOperator, PhysicalPlan>(algChecker.getDistinct(), pplan));
+                continue;
+            }
+
 
-                    POForEach mfe = foreach.clone();
-                    POForEach cfe = foreach.clone();
-                    fixUpForeachs(mfe, cfe, foreach, ap);
-                    
-                    
-                    // Use the ExprType list returned from algebraic to tell
-                    // POCombinerPackage which fields need projected and
-                    // which placed in bags.
-                    int numFields = (mKeyField >= ap.size()) ? mKeyField + 1 :
-                        ap.size();
-                    boolean[] bags = new boolean[numFields];
-                    for (int i = 0; i < ap.size(); i++) {
-                        if (ap.get(i) == ExprType.SIMPLE_PROJECT) bags[i] = false;
-                        else bags[i] = true;
+            List<PhysicalOperator> roots = pplan.getRoots();
+            //combinable operators have to be attached to POProject root(s)  
+            // if root does not have a successor that is combinable, the project 
+            // has to be projecting the group column . Otherwise this MR job
+            //is considered not combinable as we don't want to use combiner for
+            // cases where this foreach statement is projecting bags (likely to 
+            // bad for performance because of additional (de)serialization costs)
+
+            for(PhysicalOperator root : roots){
+                if(root instanceof ConstantExpression){
+                    continue;
+                }
+                if(! (root  instanceof POProject)){
+                    // how can this happen? - expect root of inner plan to be 
+                    // constant or project.  not combining it
+                    //TODO: Warn
+                    return null;
+                }
+                POProject proj = (POProject)root;
+                POUserFunc combineUdf = getAlgebraicSuccessor(proj, pplan);
+                if(combineUdf == null){
+                    // Check to see if this is a projection of the grouping column.
+                    // If so, it will be a projection of col 0 
+                    List<Integer> cols = proj.getColumns();
+                    if (cols != null && cols.size() == 1 && cols.get(0) == 0) {
+                        //it is project of grouping column, so the plan is still
+                        //combinable
+                        continue;
+                    }else{
+                        //not combinable
+                        return null;
                     }
-                    bags[mKeyField] = false;
-					// Use the POCombiner package in the combine plan
-					// as it needs to act differently than the regular
-					// package operator.
-                    POCombinerPackage combinePack =
-                        new POCombinerPackage(pack, bags, keyFieldPositions);
-                    mr.combinePlan.add(combinePack);
-                    mr.combinePlan.add(cfe);
-                    mr.combinePlan.connect(combinePack, cfe);
-                    // No need to connect projections in cfe to cp, because
-                    // PigCombiner directly attaches output from package to
-                    // root of remaining plan.
-                    
-                    POLocalRearrange mlr = rearrange.clone();
-                    fixUpRearrange(mlr);
-
-                    // A specialized local rearrange operator will replace
-                    // the normal local rearrange in the map plan. This behaves
-                    // like the regular local rearrange in the getNext() 
-                    // as far as getting its input and constructing the 
-                    // "key" out of the input. It then returns a tuple with
-                    // two fields - the key in the first position and the
-                    // "value" inside a bag in the second position. This output
-                    // format resembles the format out of a Package. This output
-                    // will feed to the map foreach which expects this format.
-                    // If the key field isn't in the project of the combiner or map foreach,
-                    // it is added to the end (This is required so that we can 
-                    // set up the inner plan of the new Local Rearrange leaf in the map
-                    // and combine plan to contain just the project of the key).
-                    patchUpMap(mr.mapPlan, getPreCombinerLR(rearrange), mfe, mlr);
-                    POLocalRearrange clr = rearrange.clone();
-                    fixUpRearrange(clr);
-
-                    mr.combinePlan.add(clr);
-                    mr.combinePlan.connect(cfe, clr);
-                    
-                    // Change the package operator in the reduce plan to
-                    // be the POCombiner package, as it needs to act
-                    // differently than the regular package operator.
-                    POCombinerPackage newReducePack =
-                        new POCombinerPackage(pack, bags, keyFieldPositions);
-                    mr.reducePlan.replace(pack, newReducePack);
-                    
-                    // the replace() above only changes
-                    // the plan and does not change "inputs" to 
-                    // operators
-                    // set up "inputs" for the operator after
-                    // package correctly
-                    List<PhysicalOperator> packList = new ArrayList<PhysicalOperator>();
-                    packList.add(newReducePack);
-                    List<PhysicalOperator> sucs = mr.reducePlan.getSuccessors(newReducePack);
-                    // there should be only one successor to package
-                    sucs.get(0).setInputs(packList);
-                } catch (Exception e) {
-                    int errCode = 2018;
-                    String msg = "Internal error. Unable to introduce the combiner for optimization.";
-                    throw new OptimizerException(msg, errCode, PigException.BUG, e);
                 }
+
+                algebraicOps.add(new Pair<PhysicalOperator, PhysicalPlan>(combineUdf, pplan));
+
             }
         }
+
+        return algebraicOps;
     }
 
     /**
+     * Look for a algebraic POUserFunc as successor to this project, called
+     * recursively to skip any other projects seen on the way.  
+     * @param proj project
+     * @param pplan physical plan
+     * @return null if any operator other POProject or algebraic POUserFunc is
+     * found while going down the plan, otherwise algebraic POUserFunc is returned
+     */
+    private POUserFunc getAlgebraicSuccessor(POProject proj, PhysicalPlan pplan) {
+        //check if root is followed by combinable operator
+        List<PhysicalOperator> succs = pplan.getSuccessors(proj);
+        if(succs == null || succs.size() == 0){
+            return null;
+        }
+        if(succs.size() > 1){
+            //project shared by more than one operator - does not happen 
+            // in plans generated today
+            // won't try to combine this
+            return null;
+        }
+
+
+        PhysicalOperator succ = succs.get(0);
+        if(succ instanceof POProject){
+            return getAlgebraicSuccessor((POProject) succ, pplan);
+        }
+
+        if(succ instanceof POUserFunc && ((POUserFunc)succ).combinable() ){
+            return (POUserFunc)succ;
+        }
+
+        //some other operator ? can't combine
+        return null;
+    }
+    
+
+    /**
+     * Create a new foreach with same scope,alias as given foreach
+     * add an inner plan that projects the group column, which is going to be
+     * the first input
+     * @param foreach source foreach
+     * @param keyType type for group-by key
+     * @return new POForeach
+     */
+    private POForEach createForEachWithGrpProj(POForEach foreach, byte keyType) {
+        String scope = foreach.getOperatorKey().scope;
+        POForEach newFE = new POForEach(createOperatorKey(scope), new ArrayList<PhysicalPlan>());
+        newFE.setAlias(foreach.getAlias());
+        newFE.setResultType(foreach.getResultType());
+        //create plan that projects the group column 
+        PhysicalPlan grpProjPlan = new PhysicalPlan();
+        //group by column is the first column
+        POProject proj = new POProject(createOperatorKey(scope), 1, 0);
+        proj.setResultType(keyType);
+        grpProjPlan.add(proj);
+
+        newFE.addInputPlan(grpProjPlan, false);
+        return newFE;
+    }
+    
+    /**
+     * Create new plan and  add to it the clones of operator algeOp  and its 
+     * predecessors from the physical plan pplan .
+     * @param algeOp algebraic operator 
+     * @param pplan physical plan that has algeOp
+     * @return new plan
+     * @throws CloneNotSupportedException
+     * @throws PlanException
+     */
+    private PhysicalPlan createPlanWithPredecessors(PhysicalOperator algeOp, PhysicalPlan pplan)
+    throws CloneNotSupportedException, PlanException {
+        PhysicalPlan newplan = new PhysicalPlan();
+        addPredecessorsToPlan(algeOp, pplan, newplan);
+        return newplan;
+    }
+
+    /**
+     * Recursively clone op and its predecessors from pplan and add them to newplan
+     * @param op
+     * @param pplan
+     * @param newplan
+     * @return
+     * @throws CloneNotSupportedException
+     * @throws PlanException
+     */
+    private PhysicalOperator addPredecessorsToPlan(PhysicalOperator op, PhysicalPlan pplan,
+            PhysicalPlan newplan)
+    throws CloneNotSupportedException, PlanException {
+        PhysicalOperator newOp = op.clone();
+        newplan.add(newOp);
+        if(pplan.getPredecessors(op) == null || pplan.getPredecessors(op).size() == 0){
+            return newOp;
+        }        
+        for(PhysicalOperator pred : pplan.getPredecessors(op)){
+            PhysicalOperator newPred = addPredecessorsToPlan(pred, pplan, newplan);
+            newplan.connect(newPred, newOp);
+        }
+        return newOp;
+    }
+    
+
+
+
+    /**
+     * add algebraic functions with appropriate projection to new foreach in combiner
+     * @param cfe - the new foreach in combiner 
+     * @param op2newpos - mapping of physical operator to position in input
+     * @throws CloneNotSupportedException
+     * @throws PlanException
+     */
+    private void addAlgebraicFuncToCombineFE(POForEach cfe, Map<PhysicalOperator, Integer> op2newpos)
+    throws CloneNotSupportedException, PlanException {
+
+        //an array that we will first populate with physical operators in order 
+        //of their position in input. Used while adding plans to combine foreach
+        // just so that output of combine foreach same positions as input. That
+        // means the same operator to position mapping can be used by reduce as well
+        PhysicalOperator[] opsInOrder = new PhysicalOperator[op2newpos.size() + 1];
+        for(Map.Entry<PhysicalOperator, Integer> op2pos : op2newpos.entrySet()){
+            opsInOrder[op2pos.getValue()] = op2pos.getKey();
+        }
+
+        // first position is used by group column and a plan has been added for it,
+        //so start with 1
+        for(int i=1; i < opsInOrder.length; i++){
+            //create new inner plan for foreach
+            //add cloned copy of given physical operator and a new project.
+            // Even if the udf in query takes multiple input, only one project
+            // needs to be added because input to this udf
+            //will be the INITIAL version of udf evaluated in map. 
+            PhysicalPlan newPlan = new PhysicalPlan();
+            PhysicalOperator newOp = opsInOrder[i].clone();
+            newPlan.add(newOp);
+            POProject proj = new POProject(
+                    createOperatorKey(cfe.getOperatorKey().getScope()),
+                    1, i
+            );
+            proj.setResultType(DataType.BAG);
+            newPlan.add(proj);
+            newPlan.connect(proj, newOp);
+            cfe.addInputPlan(newPlan, false);
+        }
+    }
+
+    /**
+     * Replace old POLocalRearrange with new pre-combine LR,
+     * add new map foreach, new map-local-rearrange, and connect them
+     * 
      * @param mapPlan
      * @param preCombinerLR
      * @param mfe
@@ -314,13 +544,13 @@ public class CombinerOptimizer extends M
      */
     private void patchUpMap(PhysicalPlan mapPlan, POPreCombinerLocalRearrange preCombinerLR,
             POForEach mfe, POLocalRearrange mlr) throws PlanException {
-        
+
         POLocalRearrange oldLR = (POLocalRearrange)mapPlan.getLeaves().get(0);
         mapPlan.replace(oldLR, preCombinerLR);
-        
+
         mapPlan.add(mfe);
         mapPlan.connect(preCombinerLR, mfe);
-        
+
         mapPlan.add(mlr);
         mapPlan.connect(mfe, mlr);
     }
@@ -330,305 +560,19 @@ public class CombinerOptimizer extends M
      * @return
      */
     private POPreCombinerLocalRearrange getPreCombinerLR(POLocalRearrange rearrange) {
-        
+
         String scope = rearrange.getOperatorKey().scope;
         POPreCombinerLocalRearrange pclr = new POPreCombinerLocalRearrange(
-                new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)),
+                createOperatorKey(scope),
                 rearrange.getRequestedParallelism(), rearrange.getInputs());
         pclr.setPlans(rearrange.getPlans());
         return pclr;
     }
 
-    /*
-    private boolean onKeysOnly(PhysicalPlan pp) {
-        // TODO
-        return false;
-    }
-    */
-
-    // At some point in the future we can expand to deconstructing
-    // non-algebraic expressions to find an algebraic or projected root.  For
-    // example, given a query: 
-    // foreach b generate group, algebraic(a), nonalgebraic(a)
-    // this could be transformed to:
-    // combiner: foreach group, initial(a), a
-    // reducer: foreach group, final(a), nonalgebraic(a)
-    // This code doesn't do this now, because deconstructing expressions is
-    // tricky.
-    private List<ExprType> algebraic(
-            List<PhysicalPlan> plans,
-            List<Boolean> flattens) throws VisitorException {
-        List<ExprType> types = new ArrayList<ExprType>(plans.size());
-        boolean atLeastOneAlgebraic = false;
-        boolean noNonAlgebraics = true;
-        keyFieldPositions = new boolean[plans.size()];
-        for (int i = 0; i < plans.size(); i++) {
-            ExprType t = algebraic(plans.get(i), flattens.get(i), i);
-            types.add(t);
-            atLeastOneAlgebraic |= 
-                (t == ExprType.ALGEBRAIC || t == ExprType.DISTINCT);
-            noNonAlgebraics &= (t != ExprType.NOT_ALGEBRAIC);
-        }
-        if (!atLeastOneAlgebraic || !noNonAlgebraics) return null;
-        else return types;
-    }
-
-    private ExprType algebraic(
-            PhysicalPlan pp,
-            Boolean toBeFlattened,
-            int field) throws VisitorException {
-        // A plan will be considered algebraic if
-        // each element is a single field OR an algebraic UDF  
-        List<PhysicalOperator> leaves = pp.getLeaves();
-        if (leaves == null || leaves.size() != 1) {
-            // Don't know what this is, but it isn't algebraic
-            return ExprType.NOT_ALGEBRAIC;
-        }
-                
-        // Check that it doesn't have anything in the nested plan that I
-        // can't make algebraic.  At this point this is just filters and
-        // foreach.  Filters are left out because they are not necessarily
-        // algebraic.  Foreach is left out because it's difficult to patch
-        // up the plan properly around them.  This is an area for future
-        // enhancement.
-        AlgebraicPlanChecker apc = new AlgebraicPlanChecker(pp);
-        apc.visit();
-        if (apc.sawNonAlgebraic) return ExprType.NOT_ALGEBRAIC;
-        if(apc.sawDistinctAgg) return ExprType.DISTINCT;
-        
-        // we did not see a Non algebraic or a distinct so far
-        // proceed to check leaves
-        PhysicalOperator leaf = leaves.get(0);
-        if (leaf instanceof POProject) {
-            POProject proj = (POProject)leaf;
-            // Check that it's a simple project.  We can't currently handle
-            // things like group.$0, because that requires resetting types on
-            // the reduce side.
-            if (pp.getPredecessors(proj) != null) return ExprType.NOT_ALGEBRAIC;
-
-            // Check if it's a projection of bag. Currently we can't use combiner 
-            // for statement like c = foreach b generate group, SUM(a), a; 
-            // where a is a bag.
-            if (proj.getResultType() == DataType.BAG) return ExprType.NOT_ALGEBRAIC;
-            
-            // Check to see if this is a projection of the grouping column.
-            // If so, it will be a projection of col 0 and will have no
-            // predecessors (to avoid things like group.$0, which isn't what we
-            // want).
-            List<Integer> cols = proj.getColumns();
-            if (cols != null && cols.size() == 1 && cols.get(0) == 0 &&
-                    pp.getPredecessors(proj) == null) {
-                mKeyField = field;
-                keyFieldPositions[field] = true;
-                mKeyType = proj.getResultType();
-            } else {
-                // It can't be a flatten except on the grouping column
-                if (toBeFlattened) return ExprType.NOT_ALGEBRAIC;
-            }
-            return ExprType.SIMPLE_PROJECT;
-        } else if (leaf instanceof POUserFunc) {
-            
-            POUserFunc userFunc = (POUserFunc)leaf;
-            if(!userFunc.combinable() ){
-                return ExprType.NOT_ALGEBRAIC;
-            }
-            // The leaf userFunc may be combinable, but there might be other 
-            // algebraic userFuncs in the predecessors, if there are
-            // we choose not to fire combiner.
-            CheckCombinableUserFunc ccuf = new CheckCombinableUserFunc(pp);
-            ccuf.visit();
-            return ccuf.exprType;
-        } else {
-            return ExprType.NOT_ALGEBRAIC;
-        }
-    }
-
-      private static class CheckCombinableUserFunc extends PhyPlanVisitor{
-
-        private ExprType exprType = ExprType.ALGEBRAIC;
-          
-        public CheckCombinableUserFunc(PhysicalPlan plan) {
-            super(plan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan));
-        }
-        
-        @Override
-        public void visit() throws VisitorException {
-            super.visit();
-        }
-         
-        @Override
-        public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
-            
-            /* We already know there is one combinable POUserFunc and its a leaf. So,  
-             * successor of that userFunc is null. We are interested to find
-             * if there is another combinable userFunc somewhere in plan (that 
-             * is a userFunc with successors and is Combinable).
-             */
-            List<PhysicalOperator> succs = this.mPlan.getSuccessors(userFunc);
-            
-            if(succs != null && !succs.isEmpty() && userFunc.combinable()){
-                this.exprType = ExprType.NOT_ALGEBRAIC;                
-            }
-        }
-      }
-    
-    // Returns number of fields that this will project, including the added
-    // key field if that is necessary
-    private void fixUpForeachs(
-            POForEach mfe, // map foreach
-            POForEach cfe, // combiner foreach
-            POForEach rfe, // reducer foreach
-            List<ExprType> exprs) throws PlanException {
-        List<PhysicalPlan> mPlans = mfe.getInputPlans();
-        List<PhysicalPlan> cPlans = cfe.getInputPlans();
-        List<PhysicalPlan> rPlans = rfe.getInputPlans();
-        for (int i = 0; i < exprs.size(); i++) {
-            if (exprs.get(i) == ExprType.ALGEBRAIC) {
-                changeFunc(mfe, mPlans.get(i), POUserFunc.INITIAL);
-                changeFunc(cfe, cPlans.get(i), POUserFunc.INTERMEDIATE);
-                changeFunc(rfe, rPlans.get(i), POUserFunc.FINAL);
-            } else if (exprs.get(i) == ExprType.DISTINCT) {
-                // A PODistinct in the plan will always have
-                // a Project[bag](*) as its successor.
-                // We will replace it with a POUserFunc with "Distinct" as 
-                // the underlying UDF. 
-                // In the map and combine, we will make this POUserFunc
-                // the leaf of the plan by removing other operators which
-                // are descendants up to the leaf.
-                // In the reduce we will keep descendants intact. Further
-                // down in fixProjectAndInputs we will change the inputs to
-                // this POUserFunc in the combine and reduce plans to be
-                // just projections of the column "i"
-                PhysicalPlan[] plans = new PhysicalPlan[] { 
-                        mPlans.get(i), cPlans.get(i), rPlans.get(i) };
-                byte[] funcTypes = new byte[] { POUserFunc.INITIAL, 
-                        POUserFunc.INTERMEDIATE, POUserFunc.FINAL };
-                for (int j = 0; j < plans.length; j++) {
-                    DistinctPatcher dp = new DistinctPatcher(plans[j]);
-                    try {
-                        dp.visit();
-                    } catch (VisitorException e) {
-                        int errCode = 2073;
-                        String msg = "Problem with replacing distinct operator with distinct built-in function.";
-                        throw new PlanException(msg, errCode, PigException.BUG, e);
-                    }
-                    
-                    
-                    PhysicalOperator leaf = plans[j].getLeaves().get(0);
-                    // make the Distinct POUserFunc the leaf in the map and combine plans.
-                    if( j != plans.length - 1) {
-                        while(!((leaf instanceof POUserFunc) && 
-                                ((POUserFunc)leaf).getFuncSpec().getClassName().startsWith(DISTINCT_UDF_CLASSNAME))) {
-                            plans[j].remove(leaf);
-                            // get the new leaf
-                            leaf = plans[j].getLeaves().get(0);
-                        }
-                        
-                    } 
-                    // Also set the Distinct's function to type Initial in map
-                    // to type Intermediate in combine plan and to type Final in
-                    // the reduce
-                    POUserFunc distinctFunc = (POUserFunc)getDistinctUserFunc(plans[j], leaf);
-                    try {
-                        distinctFunc.setAlgebraicFunction(funcTypes[j]);
-                    } catch (ExecException e) {
-                        int errCode = 2074;
-                        String msg = "Could not configure distinct's algebraic functions in map reduce plan.";
-                        throw new PlanException(msg, errCode, PigException.BUG, e);
-                    }
-                }
-                
-            }
-        }
-        
-        // since we will only be creating SingleTupleBag as input to
-        // the map foreach, we should flag the POProjects in the map
-        // foreach inner plans to also use SingleTupleBag
-        for (PhysicalPlan mpl : mPlans) {
-            try {
-                new fixMapProjects(mpl).visit();
-            } catch (VisitorException e) {
-                int errCode = 2089;
-                String msg = "Unable to flag project operator to use single tuple bag.";
-                throw new PlanException(msg, errCode, PigException.BUG, e);
-            }
-        }
-
-
-        // Set flattens for map and combiner ForEach to false
-        List<Boolean> feFlattens = new ArrayList<Boolean>(cPlans.size());
-        for (int i = 0; i < cPlans.size(); i++) {
-            feFlattens.add(false);
-        }
-        mfe.setToBeFlattened(feFlattens);
-        cfe.setToBeFlattened(feFlattens);
-
-        // If the key field isn't in the project of the combiner or map foreach, add
-        // it to the end (This is required so that we can set up the inner plan
-        // of the new Local Rearrange in the map and combine plan to contain just the
-        // project of the key).
-        if (mKeyField == -1) {
-            addKeyProject(mfe);
-            addKeyProject(cfe);
-            mKeyField = cPlans.size() - 1;
-            keyFieldPositions = new boolean[cPlans.size()];
-            keyFieldPositions[mKeyField] = true;
-        }
-
-        // Change the plans on the reduce/combine foreach to project from the column
-        // they are in ( we just want to take output from the combine and
-        // use that as input in the reduce/combine plan).  UDFs will be left the same but their
-        // inputs altered.  Any straight projections will also be altered.
-        fixProjectAndInputs(cPlans, exprs);
-        fixProjectAndInputs(rPlans, exprs);
-        
-        
-        // we have modified the foreach inner plans - so set them
-        // again for the foreach so that foreach can do any re-initialization
-        // around them.
-        // FIXME - this is a necessary evil right now because the leaves are explicitly
-        // stored in the POForeach as a list rather than computed each time at 
-        // run time from the plans for optimization. Do we want to have the Foreach
-        // compute the leaves each time and have Java optimize it (will Java optimize?)?
-        mfe.setInputPlans(mPlans);
-        cfe.setInputPlans(cPlans);
-        rfe.setInputPlans(rPlans);
+    private OperatorKey createOperatorKey(String scope) {
+        return new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope));
     }
 
-    /**
-     * @param plans
-     * @param exprs 
-     * @throws PlanException 
-     */
-    private void fixProjectAndInputs(List<PhysicalPlan> plans, List<ExprType> exprs) throws PlanException {
-        for (int i = 0; i < plans.size(); i++) {
-                List<PhysicalOperator> leaves = plans.get(i).getLeaves();
-                if (leaves == null || leaves.size() != 1) {
-                    int errCode = 2019;
-                    String msg = "Expected to find plan with single leaf. Found " + leaves.size() + " leaves.";
-                    throw new PlanException(msg, errCode, PigException.BUG);
-                }
-                PhysicalOperator leaf = leaves.get(0);
-                // the combine plan could have an extra foreach inner plan
-                // to project the key - so make sure we check the index
-                // before looking in exprs
-                if(i < exprs.size()  && exprs.get(i) == ExprType.DISTINCT) {
-                    // if there is a distinctagg, we have to
-                    // look for the Distinct POUserFunc and 
-                    // change its input to be a project of
-                    // column "i"
-                    PhysicalOperator op = getDistinctUserFunc(plans.get(i), leaf);
-                    setProjectInput(op, plans.get(i), i);
-                } else {
-                    // Leaf should be either a projection or a UDF
-                    if (leaf instanceof POProject) {
-                        ((POProject)leaf).setColumn(i);
-                    } else if (leaf instanceof POUserFunc) {
-                        setProjectInput(leaf, plans.get(i), i);
-                    }
-              }
-        }
-    }
 
     /**
      * @param op
@@ -639,8 +583,8 @@ public class CombinerOptimizer extends M
     private void setProjectInput(PhysicalOperator op, PhysicalPlan plan, int index) throws PlanException {
         String scope = op.getOperatorKey().scope;
         POProject proj = new POProject(new OperatorKey(scope, 
-            NodeIdGenerator.getGenerator().getNextNodeId(scope)),
-            op.getRequestedParallelism(), index);
+                NodeIdGenerator.getGenerator().getNextNodeId(scope)),
+                op.getRequestedParallelism(), index);
         proj.setResultType(DataType.BAG);
         // Remove old connections and elements from the plan
         plan.trimAbove(op);
@@ -650,80 +594,86 @@ public class CombinerOptimizer extends M
             new ArrayList<PhysicalOperator>(1);
         inputs.add(proj);
         op.setInputs(inputs);
-        
+
     }
 
     /**
-     * @param plan
-     * @param operator
-     * @return
+     * Change the algebriac function type for algebraic functions in map and combine
+     * In map and combine the algebraic functions will be leaf of the plan
+     * @param fe
+     * @param type
+     * @throws PlanException
      */
-    private PhysicalOperator getDistinctUserFunc(PhysicalPlan plan, PhysicalOperator operator) {
-        if(operator instanceof POUserFunc ) { 
-            if(((POUserFunc)operator).getFuncSpec().getClassName().startsWith(DISTINCT_UDF_CLASSNAME)) {
-                return operator;
+    private void changeFunc(POForEach fe, byte type) throws PlanException {
+        for(PhysicalPlan plan : fe.getInputPlans()){
+            List<PhysicalOperator> leaves = plan.getLeaves();
+            if (leaves == null || leaves.size() != 1) {
+                int errCode = 2019;
+                String msg = "Expected to find plan with single leaf. Found " + leaves.size() + " leaves.";
+                throw new PlanException(msg, errCode, PigException.BUG);
+            }
+
+            PhysicalOperator leaf = leaves.get(0);
+            if(leaf instanceof POProject){
+                continue;
+            }
+            if (!(leaf instanceof POUserFunc)) {
+                int errCode = 2020;
+                String msg = "Expected to find plan with UDF or project leaf. Found " + leaf.getClass().getSimpleName();
+                throw new PlanException(msg, errCode, PigException.BUG);
+            }
+
+            POUserFunc func = (POUserFunc)leaf;
+            try {
+                func.setAlgebraicFunction(type);
+            } catch (ExecException e) {
+                int errCode = 2075;
+                String msg = "Could not set algebraic function type.";
+                throw new PlanException(msg, errCode, PigException.BUG, e);
             }
         }
-        return getDistinctUserFunc(plan, plan.getPredecessors(operator).get(0));
-        
     }
 
 
     /**
-     * @param fe
+     * create new Local rearrange by cloning existing rearrange and 
+     * add plan for projecting the key
+     * @param rearrange
+     * @return
+     * @throws PlanException
+     * @throws CloneNotSupportedException
      */
-    private void addKeyProject(POForEach fe) {
-        PhysicalPlan newForEachInnerPlan = new PhysicalPlan();
-        String scope = fe.getOperatorKey().scope;
-        POProject proj = new POProject(new OperatorKey(scope, 
-            NodeIdGenerator.getGenerator().getNextNodeId(scope)), -1, 0);
-        proj.setResultType(mKeyType);
-        newForEachInnerPlan.add(proj);
-        fe.addInputPlan(newForEachInnerPlan, false);
-    }
-
-    private void changeFunc(POForEach fe, PhysicalPlan plan, byte type) throws PlanException {
-        List<PhysicalOperator> leaves = plan.getLeaves();
-        if (leaves == null || leaves.size() != 1) {
-            int errCode = 2019;
-            String msg = "Expected to find plan with single leaf. Found " + leaves.size() + " leaves.";
-            throw new PlanException(msg, errCode, PigException.BUG);
-        }
-
-        PhysicalOperator leaf = leaves.get(0);
-        if (!(leaf instanceof POUserFunc)) {
-            int errCode = 2020;
-            String msg = "Expected to find plan with UDF leaf. Found " + leaf.getClass().getSimpleName();
-            throw new PlanException(msg, errCode, PigException.BUG);
-        }
-        POUserFunc func = (POUserFunc)leaf;
-        try {
-            func.setAlgebraicFunction(type);
-        } catch (ExecException e) {
-            int errCode = 2075;
-            String msg = "Could not set algebraic function type.";
-            throw new PlanException(msg, errCode, PigException.BUG, e);
-        }
-    }
-
-    private void fixUpRearrange(POLocalRearrange rearrange) throws PlanException {
+    private POLocalRearrange getNewRearrange(POLocalRearrange rearrange)
+    throws PlanException, CloneNotSupportedException {
+        
+        POLocalRearrange newRearrange = rearrange.clone();
+        
         // Set the projection to be the key
         PhysicalPlan newPlan = new PhysicalPlan();
-        String scope = rearrange.getOperatorKey().scope;
+        String scope = newRearrange.getOperatorKey().scope;
         POProject proj = new POProject(new OperatorKey(scope, 
-            NodeIdGenerator.getGenerator().getNextNodeId(scope)), -1,
-            mKeyField);
-        proj.setResultType(mKeyType);
+                NodeIdGenerator.getGenerator().getNextNodeId(scope)), -1, 0);
+        proj.setResultType(newRearrange.getKeyType());
         newPlan.add(proj);
+        
         List<PhysicalPlan> plans = new ArrayList<PhysicalPlan>(1);
         plans.add(newPlan);
-        rearrange.setPlansFromCombiner(plans);
+        newRearrange.setPlansFromCombiner(plans);
+        
+        return newRearrange;
     }
 
+    /**
+     * Checks if there is something that prevents the use of algebraic interface,
+     * and looks for the PODistinct that can be used as algebraic
+     * 
+     */
     private static class AlgebraicPlanChecker extends PhyPlanVisitor {
         boolean sawNonAlgebraic = false;
         boolean sawDistinctAgg = false;
         private boolean sawForeach = false;
+        private PODistinct distinct = null;
+
 
         AlgebraicPlanChecker(PhysicalPlan plan) {
             super(plan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan));
@@ -741,9 +691,10 @@ public class CombinerOptimizer extends M
                 sawNonAlgebraic = true;
             }
         }
-        
+
         @Override
         public void visitDistinct(PODistinct distinct) throws VisitorException {
+            this.distinct = distinct;
             if(sawDistinctAgg) {
                 // we want to combine only in the case where there is only
                 // one PODistinct which is the only input to an agg
@@ -787,7 +738,7 @@ public class CombinerOptimizer extends M
             PhysicalOperator leaf = mPlan.getLeaves().get(0);
             // the leaf has to be a POUserFunc (need not be algebraic)
             if(leaf instanceof POUserFunc) {
-                
+
                 // we want to combine only in the case where there is only
                 // one PODistinct which is the only input to an agg.
                 // Do not combine if there are additional inputs.
@@ -796,7 +747,7 @@ public class CombinerOptimizer extends M
                     sawNonAlgebraic = true;
                     return;
                 }
-                
+
                 List<PhysicalOperator> immediateSuccs = mPlan.getSuccessors(distinct);
                 if(immediateSuccs.size() == 1 && immediateSuccs.get(0) instanceof POProject) {
                     if(checkSuccessorIsLeaf(leaf, immediateSuccs.get(0))) { // script 1 above
@@ -813,7 +764,7 @@ public class CombinerOptimizer extends M
                                 }
                             }
                         }
-                        
+
                     }
                 }
             }
@@ -821,7 +772,17 @@ public class CombinerOptimizer extends M
             // the pattern we expected
             sawNonAlgebraic = true;
         }
-        
+
+        /**
+         * @return the distinct
+         */
+        public PODistinct getDistinct() {
+            if(sawNonAlgebraic)
+                return null;
+            return distinct;
+        }
+
+        @Override
         public void visitLimit(POLimit limit) throws VisitorException {
             sawNonAlgebraic = true;
         }
@@ -836,7 +797,7 @@ public class CombinerOptimizer extends M
             }
             return false;
         }
-        
+
         @Override
         public void visitFilter(POFilter filter) throws VisitorException {
             sawNonAlgebraic = true;
@@ -857,7 +818,7 @@ public class CombinerOptimizer extends M
         }
 
     }
-    
+
     /**
      * A visitor to replace   
      * Project[bag][*] 
@@ -868,7 +829,7 @@ public class CombinerOptimizer extends M
      */
     private static class DistinctPatcher extends PhyPlanVisitor {
 
-        public boolean patched = false;
+        private POUserFunc distinct = null;
         /**
          * @param plan
          * @param walker
@@ -884,7 +845,7 @@ public class CombinerOptimizer extends M
         public DistinctPatcher(PhysicalPlan physicalPlan) {
             this(physicalPlan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(physicalPlan));
         }
-        
+
         /* (non-Javadoc)
          * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject)
          */
@@ -892,13 +853,13 @@ public class CombinerOptimizer extends M
         public void visitProject(POProject proj) throws VisitorException {
             // check if this project is preceded by PODistinct and
             // has the return type bag
-            
-            
+
+
             List<PhysicalOperator> preds = mPlan.getPredecessors(proj);
             if(preds == null) return; // this is a leaf project and so not interesting for patching
             PhysicalOperator pred = preds.get(0);
             if(preds.size() == 1 && pred instanceof PODistinct) {
-                if(patched) {
+                if(distinct != null) {
                     // we should not already have been patched since the
                     // Project-Distinct pair should occur only once
                     int errCode = 2076;
@@ -908,7 +869,9 @@ public class CombinerOptimizer extends M
                 // we have stick in the POUserfunc(org.apache.pig.builtin.Distinct)[DataBag]
                 // in place of the Project-PODistinct pair
                 PhysicalOperator distinctPredecessor = mPlan.getPredecessors(pred).get(0);
-                
+
+                POUserFunc func = null;
+
                 try {
                     String scope = proj.getOperatorKey().scope;
                     List<PhysicalOperator> funcInput = new ArrayList<PhysicalOperator>();
@@ -919,7 +882,7 @@ public class CombinerOptimizer extends M
                     // originally a POForeach with return type BAG - we need to
                     // set it to tuple so we get a stream of tuples. 
                     distinctPredecessor.setResultType(DataType.TUPLE);
-                    POUserFunc func = new POUserFunc(new OperatorKey(scope, 
+                    func = new POUserFunc(new OperatorKey(scope, 
                             NodeIdGenerator.getGenerator().getNextNodeId(scope)),-1, funcInput, fSpec);
                     func.setResultType(DataType.BAG);
                     mPlan.replace(proj, func);
@@ -932,12 +895,17 @@ public class CombinerOptimizer extends M
                     String msg = "Problem with reconfiguring plan to add distinct built-in function.";
                     throw new OptimizerException(msg, errCode, PigException.BUG, e);
                 }
-                patched = true;
+                distinct = func;
             } 
         }
 
+        POUserFunc getDistinct(){
+            return distinct;
+        }
+
+
     }
-    
+
     private static class fixMapProjects extends PhyPlanVisitor {
 
         public fixMapProjects(PhysicalPlan plan) {
@@ -979,11 +947,4 @@ public class CombinerOptimizer extends M
 
     }
 
-    // Reset any member variables since we may have already visited one
-    // combine.
-    private void resetState() {
-        mKeyField = -1;
-        mKeyType = 0;
-        keyFieldPositions = null;
-    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java?rev=1050757&r1=1050756&r2=1050757&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java Sun Dec 19 03:58:20 2010
@@ -18,15 +18,12 @@
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
@@ -38,8 +35,8 @@ import org.apache.pig.data.NonSpillableD
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.NullableTuple;
-import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.Pair;
 /**
@@ -63,8 +60,6 @@ public class POCombinerPackage extends P
     private boolean[] mBags; // For each field, indicates whether or not it
                              // needs to be put in a bag.
     
-    private boolean[] keyPositions;
-    
     private Map<Integer, Integer> keyLookup;
     
     private int numBags;
@@ -75,10 +70,8 @@ public class POCombinerPackage extends P
      * @param pkg POPackage to clone.
      * @param bags for each field, indicates whether it should be a bag (true)
      * or a simple field (false).
-     * @param keyPos for each field in the output tuple of the foreach operator, 
-     * indicates whether it's the group key.
      */
-    public POCombinerPackage(POPackage pkg, boolean[] bags, boolean[] keyPos) {
+    public POCombinerPackage(POPackage pkg, boolean[] bags) {
         super(new OperatorKey(pkg.getOperatorKey().scope,
             NodeIdGenerator.getGenerator().getNextNodeId(pkg.getOperatorKey().scope)),
             pkg.getRequestedParallelism(), pkg.getInputs());
@@ -96,9 +89,6 @@ public class POCombinerPackage extends P
         for (int i = 0; i < mBags.length; i++) {
             if (mBags[i]) numBags++;            
         }
-        if (keyPos != null) {
-            keyPositions = Arrays.copyOf(keyPos, keyPos.length);
-        }
     }
 
     @Override
@@ -189,10 +179,5 @@ public class POCombinerPackage extends P
         return r;
 
     }
-    
-    @Override
-    public boolean[] getKeyPositionsInTuple() {
-        return keyPositions.clone();
-    }
 
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1050757&r1=1050756&r2=1050757&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Sun Dec 19 03:58:20 2010
@@ -67,13 +67,7 @@ public class POPackage extends PhysicalO
      */
     private static final long serialVersionUID = 1L;
     
-    private static boolean[] SIMPLE_KEY_POSITION; 
 
-    static {
-        SIMPLE_KEY_POSITION = new boolean[1];
-        SIMPLE_KEY_POSITION[0] = true;
-    }
-    
     public static enum PackageType { GROUP, JOIN };
     
     //The iterator of indexed Tuples
@@ -383,18 +377,7 @@ public class POPackage extends PhysicalO
     public void setKeyType(byte keyType) {
         this.keyType = keyType;
     }
-
-    /**
-     * Get the field positions of key in the output tuples.
-     * For POPackage, the position is always 0. The POCombinerPackage,
-     * however, can return different values.
-     * 
-     * @return the field position of key in the output tuples.
-     */
-    public boolean[] getKeyPositionsInTuple() {
-        return SIMPLE_KEY_POSITION.clone();
-    }
-    
+  
     /**
      * Make a deep copy of this operator.  
      * @throws CloneNotSupportedException

Modified: pig/trunk/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCombiner.java?rev=1050757&r1=1050756&r2=1050757&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCombiner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestCombiner.java Sun Dec 19 03:58:20 2010
@@ -181,7 +181,9 @@ public class TestCombiner extends TestCa
         PigServer pigServer = new PigServer(ExecType.MAPREDUCE, props);
         pigServer.registerQuery("a = load 'MultiCombinerUseInput.txt' as (x:int);");
         pigServer.registerQuery("b = group a all;");
-        pigServer.registerQuery("c = foreach b generate COUNT(a), SUM(a.$0), MIN(a.$0), MAX(a.$0), AVG(a.$0);");
+        pigServer.registerQuery("c = foreach b generate COUNT(a), SUM(a.$0), " +
+        		"MIN(a.$0), MAX(a.$0), AVG(a.$0), ((double)SUM(a.$0))/COUNT(a.$0)," +
+        		" COUNT(a.$0) + SUM(a.$0) +  MAX(a.$0);");
 
         // make sure there is a combine plan in the explain output
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -196,6 +198,9 @@ public class TestCombiner extends TestCa
         assertEquals(0, t.get(2));
         assertEquals(1, t.get(3));
         assertEquals(0.5, t.get(4));
+        assertEquals(0.5, t.get(5));
+        assertEquals(512000L + 256000L + 1, t.get(6));
+        
         assertFalse(it.hasNext());
         Util.deleteFile(cluster, "MultiCombinerUseInput.txt");
     }
@@ -246,6 +251,79 @@ public class TestCombiner extends TestCa
         Util.deleteFile(cluster, "distinctAggs1Input.txt");
         
     }
+    
+    @Test
+    public void testGroupElements() throws Exception {
+        // test use of combiner when group elements are accessed in the foreach
+        String input[] = {
+                "ABC\t1\ta\t1",
+                "ABC\t1\tb\t2",
+                "ABC\t1\ta\t3",
+                "ABC\t2\tb\t4",
+                "DEF\t1\td\t1",
+                "XYZ\t1\tx\t2"
+        };
+
+        Util.createInputFile(cluster, "testGroupElements.txt", input);
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer.registerQuery("a = load 'testGroupElements.txt' as (str:chararray, num1:int, alph : chararray, num2 : int);");
+        pigServer.registerQuery("b = group a by (str, num1);");
+        
+        //check if combiner is present or not for various forms of foreach
+        pigServer.registerQuery("c = foreach b  generate flatten(group), COUNT(a.alph), SUM(a.num2); ");
+        checkCombinerUsed(pigServer, "c", true);
+
+        pigServer.registerQuery("c = foreach b  generate group, COUNT(a.alph), SUM(a.num2); ");
+        checkCombinerUsed(pigServer, "c", true);
+
+        // projecting bag - combiner should not be used
+        pigServer.registerQuery("c = foreach b  generate group, a,  COUNT(a.alph), SUM(a.num2); ");
+        checkCombinerUsed(pigServer, "c", false);
+
+        // projecting bag - combiner should not be used
+        pigServer.registerQuery("c = foreach b  generate group, a.num2,  COUNT(a.alph), SUM(a.num2); ");
+        checkCombinerUsed(pigServer, "c", false);      
+        
+        pigServer.registerQuery("c = foreach b  generate group.$0, group.$1, COUNT(a.alph), SUM(a.num2); ");
+        checkCombinerUsed(pigServer, "c", true);
+
+        pigServer.registerQuery("c = foreach b  generate group.$0, group.$1 + COUNT(a.alph), SUM(a.num2); ");
+        checkCombinerUsed(pigServer, "c", true);
+        
+        pigServer.registerQuery("c = foreach b  generate group.str, group.$1, COUNT(a.alph), SUM(a.num2); ");
+        checkCombinerUsed(pigServer, "c", true);
+        
+        pigServer.registerQuery("c = foreach b  generate group.str, group.$1, COUNT(a.alph), SUM(a.num2), " +
+        		" (group.num1 == 1 ? (COUNT(a.num2) + 1)  : (SUM(a.num2) + 10)) ; ");
+        checkCombinerUsed(pigServer, "c", true);
+
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "('ABC',1,3L,6L,4L)",
+                            "('ABC',2,1L,4L,14L)",
+                            "('DEF',1,1L,1L,2L)",
+                            "('XYZ',1,1L,2L,2L)",
+                    });
+
+        Iterator<Tuple> it = pigServer.openIterator("c");
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+        Util.deleteFile(cluster, "distinctAggs1Input.txt");
+        
+    }
+
+    private void checkCombinerUsed(PigServer pigServer, String string, boolean combineExpected)
+    throws IOException {
+        // make sure there is a combine plan in the explain output
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(baos);
+        pigServer.explain("c", ps);
+        boolean combinerFound = baos.toString().matches("(?si).*combine plan.*"); 
+        System.out.println(baos.toString());
+        assertEquals("is combiner present as expected", combineExpected, combinerFound);
+    }
+
 
     @Test
     public void testDistinctNoCombiner() throws Exception {

Modified: pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java?rev=1050757&r1=1050756&r2=1050757&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java Sun Dec 19 03:58:20 2010
@@ -431,7 +431,7 @@ public class TestMultiQueryCompiler {
             myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
             myPig.registerQuery("store c2 into '/tmp/output2';");
             myPig.registerQuery("d1 = group d by gid;");            
-            myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
+            myPig.registerQuery("d2 = foreach d1 generate group, d.uname, MAX(d.uid) - MIN(d.uid);");
             myPig.registerQuery("store d2 into '/tmp/output3';");
              
             LogicalPlan lp = checkLogicalPlan(1, 3, 19);