You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/12 01:20:01 UTC

svn commit: r1540892 [1/2] - in /pig/branches/tez: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/util/ test...

Author: cheolsoo
Date: Tue Nov 12 00:20:00 2013
New Revision: 1540892

URL: http://svn.apache.org/r1540892
Log:
PIG-3555: Initial implementation of combiner optimization (cheolsoo)

Added:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/CombinerOptimizer.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
Modified:
    pig/branches/tez/src/org/apache/pig/PigServer.java
    pig/branches/tez/src/org/apache/pig/PigWarning.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecType.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
    pig/branches/tez/test/org/apache/pig/test/TestCombiner.java
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC1.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC2.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC3.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld
    pig/branches/tez/test/tez-tests

Modified: pig/branches/tez/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigServer.java?rev=1540892&r1=1540891&r2=1540892&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigServer.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigServer.java Tue Nov 12 00:20:00 2013
@@ -182,6 +182,10 @@ public class PigServer {
         this(addExecTypeProperty(PropertiesUtil.loadDefaultProperties(), execTypeString));
     }
 
+    public PigServer(String execTypeString, Properties properties) throws ExecException, IOException {
+        this(addExecTypeProperty(properties, execTypeString));
+    }
+
     public PigServer(Properties properties) throws ExecException, IOException {
         this(new PigContext(properties));
     }

Modified: pig/branches/tez/src/org/apache/pig/PigWarning.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigWarning.java?rev=1540892&r1=1540891&r2=1540892&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigWarning.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigWarning.java Tue Nov 12 00:20:00 2013
@@ -41,7 +41,7 @@ public enum PigWarning {
     IMPLICIT_CAST_TO_TUPLE,
     TOO_LARGE_FOR_INT,
     MULTI_LEAF_MAP,
-    MULTI_LEAF_REDUCE,
+    MULTI_ROOT_REDUCE,
     NON_PACKAGE_REDUCE_PLAN_ROOT,
     NON_EMPTY_COMBINE_PLAN,
     PROGRESS_REPORTER_NOT_PROVIDED,

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=1540892&r1=1540891&r2=1540892&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Tue Nov 12 00:20:00 2013
@@ -17,92 +17,26 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.pig.PigException;
-import org.apache.pig.FuncSpec;
-import org.apache.pig.PigWarning;
-import org.apache.pig.data.DataType;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.util.CombinerOptimizerUtil;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.DepthFirstWalker;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.impl.plan.PlanException;
-import org.apache.pig.impl.plan.PlanWalker;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
-import org.apache.pig.impl.plan.optimizer.OptimizerException;
-import org.apache.pig.impl.util.Pair;
 
 /**
  * Optimize map reduce plans to use the combiner where possible.
- * Algebriac functions and distinct in nested plan of a foreach are partially 
- * computed in the map and combine phase.
- * A new foreach statement with initial and intermediate forms of algebraic
- * functions are added to map and combine plans respectively. 
- * 
- * If bag portion of group-by result is projected or a non algebraic 
- * expression/udf has bag as input, combiner will not be used. This is because 
- * the use of combiner in such case is likely to degrade performance 
- * as there will not be much reduction in data size in combine stage to 
- * offset the cost of the additional number of times (de)serialization is done.
- * 
- * 
- * Major areas for enhancement:
- * 1. use of combiner in cogroup
- * 2. queries with order-by, limit or sort in a nested foreach after group-by
- * 3. case where group-by is followed by filter that has algebraic expression
- *
- * 
- *
- *
  */
 public class CombinerOptimizer extends MROpPlanVisitor {
-
-    private static final String DISTINCT_UDF_CLASSNAME = org.apache.pig.builtin.Distinct.class.getName();
-
-    private Log log = LogFactory.getLog(getClass());
-
-
     private CompilationMessageCollector messageCollector = null;
-
     private boolean doMapAgg;
 
     public CombinerOptimizer(MROperPlan plan, boolean doMapAgg) {
         this(plan, doMapAgg, new CompilationMessageCollector());
     }
 
-    public CombinerOptimizer(MROperPlan plan, boolean doMapAgg, 
+    public CombinerOptimizer(MROperPlan plan, boolean doMapAgg,
             CompilationMessageCollector messageCollector) {
-
         super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
         this.messageCollector = messageCollector;
         this.doMapAgg = doMapAgg;
@@ -114,897 +48,6 @@ public class CombinerOptimizer extends M
 
     @Override
     public void visitMROp(MapReduceOper mr) throws VisitorException {
-        log.trace("Entering CombinerOptimizer.visitMROp");
-        if (mr.reducePlan.isEmpty()) return;
-
-        // part one - check if this MR job represents a group-by + foreach
-        // Find the POLocalRearrange in the map.  I'll need it later.
-        List<PhysicalOperator> mapLeaves = mr.mapPlan.getLeaves();
-        if (mapLeaves == null || mapLeaves.size() != 1) {
-            messageCollector.collect("Expected map to have single leaf!", MessageType.Warning, PigWarning.MULTI_LEAF_MAP);
-            return;
-        }
-        PhysicalOperator mapLeaf = mapLeaves.get(0);
-        if (!(mapLeaf instanceof POLocalRearrange)) {
-            return;
-        }
-        POLocalRearrange rearrange = (POLocalRearrange)mapLeaf;
-
-        List<PhysicalOperator> reduceRoots = mr.reducePlan.getRoots();
-        if (reduceRoots.size() != 1) {
-            messageCollector.collect("Expected reduce to have single leaf", MessageType.Warning, PigWarning.MULTI_LEAF_REDUCE);
-            return;
-        }
-
-        // I expect that the first root should always be a POPackage.  If
-        // not, I don't know what's going on, so I'm out of here.
-        PhysicalOperator root = reduceRoots.get(0);
-        if (!(root instanceof POPackage)) {
-            messageCollector.collect("Expected reduce root to be a POPackage", MessageType.Warning, PigWarning.NON_PACKAGE_REDUCE_PLAN_ROOT);
-            return;
-        }
-        POPackage pack = (POPackage)root;
-
-        List<PhysicalOperator> packSuccessors =
-            mr.reducePlan.getSuccessors(root);
-        if (packSuccessors == null || packSuccessors.size() != 1) return;
-        PhysicalOperator successor = packSuccessors.get(0);
-
-        if (successor instanceof POLimit) {
-            //POLimit is acceptable, as long has it has a single foreach
-            // as successor
-            List<PhysicalOperator> limitSucs =
-                mr.reducePlan.getSuccessors(successor);
-            if(limitSucs != null && limitSucs.size() == 1 && 
-                    limitSucs.get(0) instanceof POForEach) {
-                // the code below will now further examine
-                // the foreach
-                successor = limitSucs.get(0);
-            }
-
-        } 
-        if (successor instanceof POForEach) {
-            POForEach foreach = (POForEach)successor;
-            List<PhysicalPlan> feInners = foreach.getInputPlans();
-
-            // find algebraic operators and also check if the foreach statement
-            // is suitable for combiner use
-            List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = 
-                findAlgebraicOps(feInners);
-            if(algebraicOps == null || algebraicOps.size() == 0){
-                // the plan is not  combinable or there is nothing to combine
-                //we're done
-                return;
-            }
-            if (mr.combinePlan.getRoots().size() != 0) {
-                messageCollector.collect("Wasn't expecting to find anything already "
-                        + "in the combiner!", MessageType.Warning, PigWarning.NON_EMPTY_COMBINE_PLAN);
-                return;
-            }
-
-            log.info("Choosing to move algebraic foreach to combiner");
-
-            try {
-
-
-                // replace PODistinct->Project[*] with distinct udf (which is Algebriac)
-                for(Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps ){
-                    if(! (op2plan.first instanceof PODistinct))
-                        continue;
-                    DistinctPatcher distinctPatcher = new DistinctPatcher(op2plan.second);
-                    distinctPatcher.visit();
-                    if(distinctPatcher.getDistinct() == null){
-                        int errCode = 2073;
-                        String msg = "Problem with replacing distinct operator with distinct built-in function.";
-                        throw new PlanException(msg, errCode, PigException.BUG);
-                    }
-                    op2plan.first = distinctPatcher.getDistinct();
-                }
-
-                //create new map foreach
-                POForEach mfe = createForEachWithGrpProj(foreach, rearrange.getKeyType());                
-                Map<PhysicalOperator, Integer> op2newpos = 
-                    new HashMap<PhysicalOperator, Integer>();
-                Integer pos = 1;
-                //create plan for each algebraic udf and add as inner plan in map-foreach 
-                for(Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps ){
-                    PhysicalPlan udfPlan = createPlanWithPredecessors(op2plan.first, op2plan.second);
-                    mfe.addInputPlan(udfPlan, false);
-                    op2newpos.put(op2plan.first, pos++);
-                }
-                changeFunc(mfe, POUserFunc.INITIAL);
-
-                // since we will only be creating SingleTupleBag as input to
-                // the map foreach, we should flag the POProjects in the map
-                // foreach inner plans to also use SingleTupleBag
-                for (PhysicalPlan mpl : mfe.getInputPlans()) {
-                    try {
-                        new fixMapProjects(mpl).visit();
-                    } catch (VisitorException e) {
-                        int errCode = 2089;
-                        String msg = "Unable to flag project operator to use single tuple bag.";
-                        throw new PlanException(msg, errCode, PigException.BUG, e);
-                    }
-                }
-
-                //create new combine foreach
-                POForEach cfe = createForEachWithGrpProj(foreach, rearrange.getKeyType());
-                //add algebraic functions with appropriate projection
-                addAlgebraicFuncToCombineFE(cfe, op2newpos);
-                changeFunc(cfe, POUserFunc.INTERMEDIATE);
-
-                //fix projection and function time for algebraic functions in reduce foreach
-                for(Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps ){
-                    setProjectInput(op2plan.first, op2plan.second, op2newpos.get(op2plan.first));
-                    ((POUserFunc)op2plan.first).setAlgebraicFunction(POUserFunc.FINAL);
-                }
-
-
-                // we have modified the foreach inner plans - so set them
-                // again for the foreach so that foreach can do any re-initialization
-                // around them.
-                // FIXME - this is a necessary evil right now because the leaves are explicitly
-                // stored in the POForeach as a list rather than computed each time at 
-                // run time from the plans for optimization. Do we want to have the Foreach
-                // compute the leaves each time and have Java optimize it (will Java optimize?)?
-                mfe.setInputPlans(mfe.getInputPlans());
-                cfe.setInputPlans(cfe.getInputPlans());
-                foreach.setInputPlans(foreach.getInputPlans());
-
-                //tell POCombinerPackage which fields need projected and
-                // which placed in bags. First field is simple project
-                // rest need to go into bags
-                int numFields = algebraicOps.size() + 1; // algebraic funcs + group key
-                boolean[] bags = new boolean[numFields];
-                bags[0] = false;
-                for (int i = 1; i < numFields; i++) {
-                    bags[i] = true;
-                }
-
-                // Use the POCombiner package in the combine plan
-                // as it needs to act differently than the regular
-                // package operator.
-                mr.combinePlan = new PhysicalPlan();
-                POCombinerPackage combinePack =
-                    new POCombinerPackage(pack, bags);
-                mr.combinePlan.add(combinePack);
-                mr.combinePlan.add(cfe);
-                mr.combinePlan.connect(combinePack, cfe);
-
-                // No need to connect projections in cfe to cp, because
-                // PigCombiner directly attaches output from package to
-                // root of remaining plan.
-
-                POLocalRearrange mlr = getNewRearrange(rearrange);
-
-                POPartialAgg mapAgg = null;
-                if(doMapAgg){
-                    mapAgg = createPartialAgg(cfe);
-                }
-
-                // A specialized local rearrange operator will replace
-                // the normal local rearrange in the map plan. This behaves
-                // like the regular local rearrange in the getNext() 
-                // as far as getting its input and constructing the 
-                // "key" out of the input. It then returns a tuple with
-                // two fields - the key in the first position and the
-                // "value" inside a bag in the second position. This output
-                // format resembles the format out of a Package. This output
-                // will feed to the map foreach which expects this format.
-                // If the key field isn't in the project of the combiner or map foreach,
-                // it is added to the end (This is required so that we can 
-                // set up the inner plan of the new Local Rearrange leaf in the map
-                // and combine plan to contain just the project of the key).
-                patchUpMap(mr.mapPlan, getPreCombinerLR(rearrange), mfe, mapAgg, mlr);
-                POLocalRearrange clr = getNewRearrange(rearrange);
-
-                mr.combinePlan.add(clr);
-                mr.combinePlan.connect(cfe, clr);
-
-                // Change the package operator in the reduce plan to
-                // be the POCombiner package, as it needs to act
-                // differently than the regular package operator.
-                POCombinerPackage newReducePack =
-                    new POCombinerPackage(pack, bags);
-                mr.reducePlan.replace(pack, newReducePack);
-
-                // the replace() above only changes
-                // the plan and does not change "inputs" to 
-                // operators
-                // set up "inputs" for the operator after
-                // package correctly
-                List<PhysicalOperator> packList = new ArrayList<PhysicalOperator>();
-                packList.add(newReducePack);
-                List<PhysicalOperator> sucs = mr.reducePlan.getSuccessors(newReducePack);
-                // there should be only one successor to package
-                sucs.get(0).setInputs(packList);
-            } catch (Exception e) {
-                int errCode = 2018;
-                String msg = "Internal error. Unable to introduce the combiner for optimization.";
-                throw new OptimizerException(msg, errCode, PigException.BUG, e);
-            }
-        }
-    }
-
-
-    /**
-     * Translate POForEach in combiner into a POPartialAgg
-     * @param combineFE
-     * @return partial aggregate operator
-     * @throws CloneNotSupportedException 
-     */
-    private POPartialAgg createPartialAgg(POForEach combineFE)
-            throws CloneNotSupportedException {
-        String scope = combineFE.getOperatorKey().scope;
-        POPartialAgg poAgg = new POPartialAgg(new OperatorKey(scope, 
-                NodeIdGenerator.getGenerator().getNextNodeId(scope)));
-        poAgg.addOriginalLocation(combineFE.getAlias(), combineFE.getOriginalLocations());
-        poAgg.setResultType(combineFE.getResultType());
-
-        //first plan in combine foreach is the group key
-        poAgg.setKeyPlan(combineFE.getInputPlans().get(0).clone());
-
-        List<PhysicalPlan> valuePlans = new ArrayList<PhysicalPlan>();
-        for(int i=1; i<combineFE.getInputPlans().size(); i++){
-            valuePlans.add(combineFE.getInputPlans().get(i).clone());
-        }
-        poAgg.setValuePlans(valuePlans);
-        return poAgg;
-    }
-
-    /**
-     * find algebraic operators and also check if the foreach statement
-     *  is suitable for combiner use
-     * @param feInners inner plans of foreach
-     * @return null if plan is not combinable, otherwise list of combinable operators
-     * @throws VisitorException
-     */
-    private List<Pair<PhysicalOperator, PhysicalPlan>> 
-    findAlgebraicOps(List<PhysicalPlan> feInners)
-    throws VisitorException {
-        ArrayList<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = new ArrayList<Pair<PhysicalOperator, PhysicalPlan>>();
-
-        //check each foreach inner plan
-        for(PhysicalPlan pplan : feInners){
-            //check for presence of non combinable operators
-            AlgebraicPlanChecker algChecker = new AlgebraicPlanChecker(pplan);
-            algChecker.visit();
-            if(algChecker.sawNonAlgebraic){
-                return null;
-            }
-
-            //if we found a combinable distinct add that to list
-            if(algChecker.sawDistinctAgg){
-                algebraicOps.add(new Pair<PhysicalOperator, PhysicalPlan>(algChecker.getDistinct(), pplan));
-                continue;
-            }
-
-
-            List<PhysicalOperator> roots = pplan.getRoots();
-            //combinable operators have to be attached to POProject root(s)  
-            // if root does not have a successor that is combinable, the project 
-            // has to be projecting the group column . Otherwise this MR job
-            //is considered not combinable as we don't want to use combiner for
-            // cases where this foreach statement is projecting bags (likely to 
-            // bad for performance because of additional (de)serialization costs)
-
-            for(PhysicalOperator root : roots){
-                if(root instanceof ConstantExpression){
-                    continue;
-                }
-                if(! (root  instanceof POProject)){
-                    // how can this happen? - expect root of inner plan to be 
-                    // constant or project.  not combining it
-                    //TODO: Warn
-                    return null;
-                }
-                POProject proj = (POProject)root;
-                POUserFunc combineUdf = getAlgebraicSuccessor(proj, pplan);
-                if(combineUdf == null){
-                    
-                    if(proj.isProjectToEnd()){
-                        //project-star or project to end
-                        // not combinable
-                        return null;
-                    }
-                    
-                    // Check to see if this is a projection of the grouping column.
-                    // If so, it will be a projection of col 0 
-                    List<Integer> cols = proj.getColumns();
-                    if (cols != null && cols.size() == 1 && cols.get(0) == 0) {
-                        //it is project of grouping column, so the plan is still
-                        //combinable
-                        continue;
-                    }else{
-                        //not combinable
-                        return null;
-                    }
-                }
-
-                // The algebraic udf can have more than one input. Add the udf only once
-                boolean exist = false;
-                for (Pair<PhysicalOperator, PhysicalPlan> pair : algebraicOps) {
-                    if (pair.first.equals(combineUdf)) {
-                        exist = true;
-                        break;
-                    }
-                }
-                if (!exist)
-                    algebraicOps.add(new Pair<PhysicalOperator, PhysicalPlan>(combineUdf, pplan));
-            }
-        }
-
-        return algebraicOps;
+        CombinerOptimizerUtil.addCombiner(mr.mapPlan, mr.reducePlan, mr.combinePlan, messageCollector, doMapAgg);
     }
-
-    /**
-     * Look for a algebraic POUserFunc as successor to this project, called
-     * recursively to skip any other projects seen on the way.  
-     * @param proj project
-     * @param pplan physical plan
-     * @return null if any operator other POProject or algebraic POUserFunc is
-     * found while going down the plan, otherwise algebraic POUserFunc is returned
-     */
-    private POUserFunc getAlgebraicSuccessor(POProject proj, PhysicalPlan pplan) {
-        //check if root is followed by combinable operator
-        List<PhysicalOperator> succs = pplan.getSuccessors(proj);
-        if(succs == null || succs.size() == 0){
-            return null;
-        }
-        if(succs.size() > 1){
-            //project shared by more than one operator - does not happen 
-            // in plans generated today
-            // won't try to combine this
-            return null;
-        }
-
-
-        PhysicalOperator succ = succs.get(0);
-        if(succ instanceof POProject){
-            return getAlgebraicSuccessor((POProject) succ, pplan);
-        }
-
-        if(succ instanceof POUserFunc && ((POUserFunc)succ).combinable() ){
-            return (POUserFunc)succ;
-        }
-
-        //some other operator ? can't combine
-        return null;
-    }
-    
-
-    /**
-     * Create a new foreach with same scope,alias as given foreach
-     * add an inner plan that projects the group column, which is going to be
-     * the first input
-     * @param foreach source foreach
-     * @param keyType type for group-by key
-     * @return new POForeach
-     */
-    private POForEach createForEachWithGrpProj(POForEach foreach, byte keyType) {
-        String scope = foreach.getOperatorKey().scope;
-        POForEach newFE = new POForEach(createOperatorKey(scope), new ArrayList<PhysicalPlan>());
-        newFE.addOriginalLocation(foreach.getAlias(), foreach.getOriginalLocations());
-        newFE.setResultType(foreach.getResultType());
-        //create plan that projects the group column 
-        PhysicalPlan grpProjPlan = new PhysicalPlan();
-        //group by column is the first column
-        POProject proj = new POProject(createOperatorKey(scope), 1, 0);
-        proj.setResultType(keyType);
-        grpProjPlan.add(proj);
-
-        newFE.addInputPlan(grpProjPlan, false);
-        return newFE;
-    }
-    
-    /**
-     * Create new plan and  add to it the clones of operator algeOp  and its 
-     * predecessors from the physical plan pplan .
-     * @param algeOp algebraic operator 
-     * @param pplan physical plan that has algeOp
-     * @return new plan
-     * @throws CloneNotSupportedException
-     * @throws PlanException
-     */
-    private PhysicalPlan createPlanWithPredecessors(PhysicalOperator algeOp, PhysicalPlan pplan)
-    throws CloneNotSupportedException, PlanException {
-        PhysicalPlan newplan = new PhysicalPlan();
-        addPredecessorsToPlan(algeOp, pplan, newplan);
-        return newplan;
-    }
-
-    /**
-     * Recursively clone op and its predecessors from pplan and add them to newplan
-     * @param op
-     * @param pplan
-     * @param newplan
-     * @return
-     * @throws CloneNotSupportedException
-     * @throws PlanException
-     */
-    private PhysicalOperator addPredecessorsToPlan(PhysicalOperator op, PhysicalPlan pplan,
-            PhysicalPlan newplan)
-    throws CloneNotSupportedException, PlanException {
-        PhysicalOperator newOp = op.clone();
-        newplan.add(newOp);
-        if(pplan.getPredecessors(op) == null || pplan.getPredecessors(op).size() == 0){
-            return newOp;
-        }        
-        for(PhysicalOperator pred : pplan.getPredecessors(op)){
-            PhysicalOperator newPred = addPredecessorsToPlan(pred, pplan, newplan);
-            newplan.connect(newPred, newOp);
-        }
-        return newOp;
-    }
-    
-
-
-
-    /**
-     * add algebraic functions with appropriate projection to new foreach in combiner
-     * @param cfe - the new foreach in combiner 
-     * @param op2newpos - mapping of physical operator to position in input
-     * @throws CloneNotSupportedException
-     * @throws PlanException
-     */
-    private void addAlgebraicFuncToCombineFE(POForEach cfe, Map<PhysicalOperator, Integer> op2newpos)
-    throws CloneNotSupportedException, PlanException {
-
-        //an array that we will first populate with physical operators in order 
-        //of their position in input. Used while adding plans to combine foreach
-        // just so that output of combine foreach same positions as input. That
-        // means the same operator to position mapping can be used by reduce as well
-        PhysicalOperator[] opsInOrder = new PhysicalOperator[op2newpos.size() + 1];
-        for(Map.Entry<PhysicalOperator, Integer> op2pos : op2newpos.entrySet()){
-            opsInOrder[op2pos.getValue()] = op2pos.getKey();
-        }
-
-        // first position is used by group column and a plan has been added for it,
-        //so start with 1
-        for(int i=1; i < opsInOrder.length; i++){
-            //create new inner plan for foreach
-            //add cloned copy of given physical operator and a new project.
-            // Even if the udf in query takes multiple input, only one project
-            // needs to be added because input to this udf
-            //will be the INITIAL version of udf evaluated in map. 
-            PhysicalPlan newPlan = new PhysicalPlan();
-            PhysicalOperator newOp = opsInOrder[i].clone();
-            newPlan.add(newOp);
-            POProject proj = new POProject(
-                    createOperatorKey(cfe.getOperatorKey().getScope()),
-                    1, i
-            );
-            proj.setResultType(DataType.BAG);
-            newPlan.add(proj);
-            newPlan.connect(proj, newOp);
-            cfe.addInputPlan(newPlan, false);
-        }
-    }
-
-    /**
-     * Replace old POLocalRearrange with new pre-combine LR,
-     * add new map foreach, new map-local-rearrange, and connect them
-     * 
-     * @param mapPlan
-     * @param preCombinerLR
-     * @param mfe
-     * @param mapAgg 
-     * @param mlr
-     * @throws PlanException 
-     */
-    private void patchUpMap(PhysicalPlan mapPlan, POPreCombinerLocalRearrange preCombinerLR,
-            POForEach mfe, POPartialAgg mapAgg, POLocalRearrange mlr)
-                    throws PlanException {
-
-        POLocalRearrange oldLR = (POLocalRearrange)mapPlan.getLeaves().get(0);
-        mapPlan.replace(oldLR, preCombinerLR);
-
-        mapPlan.add(mfe);
-        mapPlan.connect(preCombinerLR, mfe);
-
-        //the operator before local rearrange
-        PhysicalOperator opBeforeLR = mfe;
-
-        if(mapAgg != null){
-            mapPlan.add(mapAgg);
-            mapPlan.connect(mfe, mapAgg);
-            opBeforeLR = mapAgg;
-        }
-
-        mapPlan.add(mlr);
-        mapPlan.connect(opBeforeLR, mlr);
-    }
-
-    /**
-     * @param rearrange
-     * @return
-     */
-    private POPreCombinerLocalRearrange getPreCombinerLR(POLocalRearrange rearrange) {
-
-        String scope = rearrange.getOperatorKey().scope;
-        POPreCombinerLocalRearrange pclr = new POPreCombinerLocalRearrange(
-                createOperatorKey(scope),
-                rearrange.getRequestedParallelism(), rearrange.getInputs());
-        pclr.setPlans(rearrange.getPlans());
-        return pclr;
-    }
-
-    private OperatorKey createOperatorKey(String scope) {
-        return new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope));
-    }
-
-
-    /**
-     * @param op
-     * @param index 
-     * @param plan 
-     * @throws PlanException 
-     */
-    private void setProjectInput(PhysicalOperator op, PhysicalPlan plan, int index) throws PlanException {
-        String scope = op.getOperatorKey().scope;
-        POProject proj = new POProject(new OperatorKey(scope, 
-                NodeIdGenerator.getGenerator().getNextNodeId(scope)),
-                op.getRequestedParallelism(), index);
-        proj.setResultType(DataType.BAG);
-        // Remove old connections and elements from the plan
-        plan.trimAbove(op);
-        plan.add(proj);
-        plan.connect(proj, op);
-        List<PhysicalOperator> inputs =
-            new ArrayList<PhysicalOperator>(1);
-        inputs.add(proj);
-        op.setInputs(inputs);
-
-    }
-
-    /**
-     * Change the algebriac function type for algebraic functions in map and combine
-     * In map and combine the algebraic functions will be leaf of the plan
-     * @param fe
-     * @param type
-     * @throws PlanException
-     */
-    private void changeFunc(POForEach fe, byte type) throws PlanException {
-        for(PhysicalPlan plan : fe.getInputPlans()){
-            List<PhysicalOperator> leaves = plan.getLeaves();
-            if (leaves == null || leaves.size() != 1) {
-                int errCode = 2019;
-                String msg = "Expected to find plan with single leaf. Found " + leaves.size() + " leaves.";
-                throw new PlanException(msg, errCode, PigException.BUG);
-            }
-
-            PhysicalOperator leaf = leaves.get(0);
-            if(leaf instanceof POProject){
-                continue;
-            }
-            if (!(leaf instanceof POUserFunc)) {
-                int errCode = 2020;
-                String msg = "Expected to find plan with UDF or project leaf. Found " + leaf.getClass().getSimpleName();
-                throw new PlanException(msg, errCode, PigException.BUG);
-            }
-
-            POUserFunc func = (POUserFunc)leaf;
-            try {
-                func.setAlgebraicFunction(type);
-            } catch (ExecException e) {
-                int errCode = 2075;
-                String msg = "Could not set algebraic function type.";
-                throw new PlanException(msg, errCode, PigException.BUG, e);
-            }
-        }
-    }
-
-
-    /**
-     * create new Local rearrange by cloning existing rearrange and 
-     * add plan for projecting the key
-     * @param rearrange
-     * @return
-     * @throws PlanException
-     * @throws CloneNotSupportedException
-     */
-    private POLocalRearrange getNewRearrange(POLocalRearrange rearrange)
-    throws PlanException, CloneNotSupportedException {
-        
-        POLocalRearrange newRearrange = rearrange.clone();
-        
-        // Set the projection to be the key
-        PhysicalPlan newPlan = new PhysicalPlan();
-        String scope = newRearrange.getOperatorKey().scope;
-        POProject proj = new POProject(new OperatorKey(scope, 
-                NodeIdGenerator.getGenerator().getNextNodeId(scope)), -1, 0);
-        proj.setResultType(newRearrange.getKeyType());
-        newPlan.add(proj);
-        
-        List<PhysicalPlan> plans = new ArrayList<PhysicalPlan>(1);
-        plans.add(newPlan);
-        newRearrange.setPlansFromCombiner(plans);
-        
-        return newRearrange;
-    }
-
-    /**
-     * Checks if there is something that prevents the use of algebraic interface,
-     * and looks for the PODistinct that can be used as algebraic
-     * 
-     */
-    private static class AlgebraicPlanChecker extends PhyPlanVisitor {
-        boolean sawNonAlgebraic = false;
-        boolean sawDistinctAgg = false;
-        private boolean sawForeach = false;
-        private PODistinct distinct = null;
-
-
-        AlgebraicPlanChecker(PhysicalPlan plan) {
-            super(plan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan));
-        }
-
-        /* (non-Javadoc)
-         * @see org.apache.pig.impl.plan.PlanVisitor#visit()
-         */
-        @Override
-        public void visit() throws VisitorException {
-            super.visit();
-            // if we saw foreach and distinct agg its ok
-            // else if we only saw foreach, mark it as non algebraic
-            if(sawForeach && !sawDistinctAgg) {
-                sawNonAlgebraic = true;
-            }
-        }
-
-        @Override
-        public void visitDistinct(PODistinct distinct) throws VisitorException {
-            this.distinct = distinct;
-            if(sawDistinctAgg) {
-                // we want to combine only in the case where there is only
-                // one PODistinct which is the only input to an agg
-                // we apparently have seen a PODistinct before, so lets not
-                // combine.
-                sawNonAlgebraic = true;
-                return;
-            }
-            // check that this distinct is the only input to an agg
-            // We could have the following two cases
-            // script 1:
-            // ..
-            // b = group a by ...
-            // c = foreach b { x = distinct a; generate AGG(x), ...}
-            // The above script leads to the following plan for AGG(x):
-            // POUserFunc(org.apache.pig.builtin.COUNT)[long] 
-            //   |
-            //   |---Project[bag][*] 
-            //       |
-            //       |---PODistinct[bag] 
-            //           |
-            //           |---Project[tuple][1] 
-
-            // script 2:
-            // ..
-            // b = group a by ...
-            // c = foreach b { x = distinct a; generate AGG(x.$1), ...}
-            // The above script leads to the following plan for AGG(x.$1):
-            // POUserFunc(org.apache.pig.builtin.IntSum)[long]
-            //   |
-            //   |---Project[bag][1]
-            //       |
-            //       |---Project[bag][*]
-            //           |
-            //           |---PODistinct[bag]
-            //               |
-            //               |---Project[tuple][1]
-            // So tracing from the PODistinct to its successors upto the leaf, we should
-            // see a Project[bag][*] as the immediate successor and an optional Project[bag]
-            // as the next successor till we see the leaf.
-            PhysicalOperator leaf = mPlan.getLeaves().get(0);
-            // the leaf has to be a POUserFunc (need not be algebraic)
-            if(leaf instanceof POUserFunc) {
-
-                // we want to combine only in the case where there is only
-                // one PODistinct which is the only input to an agg.
-                // Do not combine if there are additional inputs.
-                List<PhysicalOperator> preds = mPlan.getPredecessors(leaf);
-                if (preds.size() > 1) {
-                    sawNonAlgebraic = true;
-                    return;
-                }
-
-                List<PhysicalOperator> immediateSuccs = mPlan.getSuccessors(distinct);
-                if(immediateSuccs.size() == 1 && immediateSuccs.get(0) instanceof POProject) {
-                    if(checkSuccessorIsLeaf(leaf, immediateSuccs.get(0))) { // script 1 above
-                        sawDistinctAgg = true;
-                        return;
-                    } else { // check for script 2 scenario above
-                        List<PhysicalOperator> nextSuccs = mPlan.getSuccessors(immediateSuccs.get(0));
-                        if(nextSuccs.size() == 1) {
-                            PhysicalOperator op = nextSuccs.get(0);
-                            if(op instanceof POProject) {
-                                if(checkSuccessorIsLeaf(leaf, op)) {
-                                    sawDistinctAgg = true;
-                                    return;
-                                }
-                            }
-                        }
-
-                    }
-                }
-            }
-            // if we did not return above, that means we did not see
-            // the pattern we expected
-            sawNonAlgebraic = true;
-        }
-
-        /**
-         * @return the distinct
-         */
-        public PODistinct getDistinct() {
-            if(sawNonAlgebraic)
-                return null;
-            return distinct;
-        }
-
-        @Override
-        public void visitLimit(POLimit limit) throws VisitorException {
-            sawNonAlgebraic = true;
-        }
-
-        private boolean checkSuccessorIsLeaf(PhysicalOperator leaf, PhysicalOperator opToCheck) {
-            List<PhysicalOperator> succs = mPlan.getSuccessors(opToCheck);
-            if(succs.size() == 1) {
-                PhysicalOperator op = succs.get(0);
-                if(op == leaf) {
-                    return true;
-                }
-            }
-            return false;
-        }
-
-        @Override
-        public void visitFilter(POFilter filter) throws VisitorException {
-            sawNonAlgebraic = true;
-        }
-
-        @Override
-        public void visitPOForEach(POForEach fe) throws VisitorException {
-            // we need to allow foreach as input for distinct
-            // but don't want it for other things (why?). So lets
-            // flag the presence of Foreach and if this is present
-            // with a distinct agg, it will be allowed.
-            sawForeach = true;
-        }
-
-        @Override
-        public void visitSort(POSort sort) throws VisitorException {
-            sawNonAlgebraic = true;
-        }
-
-    }
-
-    /**
-     * A visitor to replace   
-     * Project[bag][*] 
-     *  |
-     *  |---PODistinct[bag]
-     * with 
-     * POUserFunc(org.apache.pig.builtin.Distinct)[DataBag]    
-     */
-    private static class DistinctPatcher extends PhyPlanVisitor {
-
-        private POUserFunc distinct = null;
-        /**
-         * @param plan
-         * @param walker
-         */
-        public DistinctPatcher(PhysicalPlan plan,
-                PlanWalker<PhysicalOperator, PhysicalPlan> walker) {
-            super(plan, walker);
-        }
-
-        /**
-         * @param physicalPlan
-         */
-        public DistinctPatcher(PhysicalPlan physicalPlan) {
-            this(physicalPlan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(physicalPlan));
-        }
-
-        /* (non-Javadoc)
-         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject)
-         */
-        @Override
-        public void visitProject(POProject proj) throws VisitorException {
-            // check if this project is preceded by PODistinct and
-            // has the return type bag
-
-
-            List<PhysicalOperator> preds = mPlan.getPredecessors(proj);
-            if(preds == null) return; // this is a leaf project and so not interesting for patching
-            PhysicalOperator pred = preds.get(0);
-            if(preds.size() == 1 && pred instanceof PODistinct) {
-                if(distinct != null) {
-                    // we should not already have been patched since the
-                    // Project-Distinct pair should occur only once
-                    int errCode = 2076;
-                    String msg = "Unexpected Project-Distinct pair while trying to set up plans for use with combiner.";
-                    throw new OptimizerException(msg, errCode, PigException.BUG);
-                }
-                // we have stick in the POUserfunc(org.apache.pig.builtin.Distinct)[DataBag]
-                // in place of the Project-PODistinct pair
-                PhysicalOperator distinctPredecessor = mPlan.getPredecessors(pred).get(0);
-
-                POUserFunc func = null;
-
-                try {
-                    String scope = proj.getOperatorKey().scope;
-                    List<PhysicalOperator> funcInput = new ArrayList<PhysicalOperator>();
-                    FuncSpec fSpec = new FuncSpec(DISTINCT_UDF_CLASSNAME);
-                    funcInput.add(distinctPredecessor);
-                    // explicitly set distinctPredecessor's result type to
-                    // be tuple - this is relevant when distinctPredecessor is
-                    // originally a POForeach with return type BAG - we need to
-                    // set it to tuple so we get a stream of tuples. 
-                    distinctPredecessor.setResultType(DataType.TUPLE);
-                    func = new POUserFunc(new OperatorKey(scope, 
-                            NodeIdGenerator.getGenerator().getNextNodeId(scope)),-1, funcInput, fSpec);
-                    func.setResultType(DataType.BAG);
-                    mPlan.replace(proj, func);
-                    mPlan.remove(pred);
-                    // connect the the newly added "func" to
-                    // the predecessor to the earlier PODistinct
-                    mPlan.connect(distinctPredecessor, func);
-                } catch (PlanException e) {
-                    int errCode = 2077;
-                    String msg = "Problem with reconfiguring plan to add distinct built-in function.";
-                    throw new OptimizerException(msg, errCode, PigException.BUG, e);
-                }
-                distinct = func;
-            } 
-        }
-
-        POUserFunc getDistinct(){
-            return distinct;
-        }
-
-
-    }
-
-    private static class fixMapProjects extends PhyPlanVisitor {
-
-        public fixMapProjects(PhysicalPlan plan) {
-            this(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
-                    plan));
-        }
-
-        /**
-         * @param plan
-         * @param walker
-         */
-        public fixMapProjects(PhysicalPlan plan,
-                PlanWalker<PhysicalOperator, PhysicalPlan> walker) {
-            super(plan, walker);
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject)
-         */
-        @Override
-        public void visitProject(POProject proj) throws VisitorException {
-            if (proj.getResultType() == DataType.BAG) {
-
-                // IMPORTANT ASSUMPTION:
-                // we should be calling this visitor only for
-                // fixing up the projects in the map's foreach
-                // inner plan. In the map side, we are dealing
-                // with single tuple bags - so set the flag in
-                // the project to use single tuple bags. If in
-                // future we don't have single tuple bags in the
-                // input to map's foreach, we should NOT be doing
-                // this!
-                proj.setResultSingleTupleBag(true);
-
-            }
-        }
-
-    }
-
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecType.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecType.java?rev=1540892&r1=1540891&r2=1540892&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecType.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecType.java Tue Nov 12 00:20:00 2013
@@ -31,7 +31,7 @@ import org.apache.pig.impl.PigContext;
 public class MRExecType implements ExecType {
 
     private static final long serialVersionUID = 1L;
-    private static final String[] modes = { "MAPREDUCE", "MAPRED" };
+    private static final String[] modes = { "MAPREDUCE", "MAPRED", "MR" };
 
     @Override
     public boolean accepts(Properties properties) {

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/CombinerOptimizer.java?rev=1540892&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/CombinerOptimizer.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/CombinerOptimizer.java Tue Nov 12 00:20:00 2013
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.List;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.util.CombinerOptimizerUtil;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * Optimize tez plans to use the combiner where possible.
+ */
+public class CombinerOptimizer extends TezOpPlanVisitor {
+    private CompilationMessageCollector messageCollector = null;
+    private TezOperPlan parentPlan;
+    private boolean doMapAgg;
+
+    public CombinerOptimizer(TezOperPlan plan, boolean doMapAgg) {
+        this(plan, doMapAgg, new CompilationMessageCollector());
+    }
+
+    public CombinerOptimizer(TezOperPlan plan, boolean doMapAgg,
+            CompilationMessageCollector messageCollector) {
+        super(plan, new DepthFirstWalker<TezOperator, TezOperPlan>(plan));
+        this.messageCollector = messageCollector;
+        this.doMapAgg = doMapAgg;
+        this.parentPlan = plan;
+    }
+
+    public CompilationMessageCollector getMessageCollector() {
+        return messageCollector;
+    }
+
+    @Override
+    public void visitTezOp(TezOperator to) throws VisitorException {
+        List<POPackage> packages = PlanHelper.getPhysicalOperators(to.plan, POPackage.class);
+        if (packages.isEmpty()) {
+            return;
+        }
+
+        List<TezOperator> predecessors = parentPlan.getPredecessors(to);
+        if (predecessors == null) {
+            return;
+        }
+
+        for (TezOperator from : predecessors) {
+            List<POLocalRearrange> rearranges = PlanHelper.getPhysicalOperators(from.plan, POLocalRearrange.class);
+            if (rearranges.isEmpty()) {
+                continue;
+            }
+
+            // Detected the POLocalRearrange -> POPackage pattern. Let's add
+            // combiner if possible.
+            PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
+            // TODO: Right now, CombinerOptimzerUtil doesn't handle a single map
+            // plan with multiple POLocalRearrange leaves. i.e. SPLIT + multiple
+            // GROUP BY with different keys.
+            CombinerOptimizerUtil.addCombiner(from.plan, to.plan, combinePlan, messageCollector, doMapAgg);
+        }
+    }
+}
+

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1540892&r1=1540891&r2=1540892&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Tue Nov 12 00:20:00 2013
@@ -151,7 +151,7 @@ public class TezCompiler extends PhyPlan
     public TezOperPlan getTezPlan() {
         return tezPlan;
     }
-    
+
     // Segment a single DAG into a DAG graph
     public TezPlanContainer getPlanContainer() throws PlanException {
         TezPlanContainer tezPlanContainer = new TezPlanContainer(pigContext);
@@ -343,6 +343,9 @@ public class TezCompiler extends PhyPlan
         for (TezOperator tezOp : compiledInputs) {
             tezOp.setClosed(true);
             tezPlan.connect(tezOp, newTezOp);
+            // Add edge descriptors to old and new operators
+            newTezOp.inEdges.put(tezOp.getOperatorKey(), new TezEdgeDescriptor());
+            tezOp.outEdges.put(newTezOp.getOperatorKey(), new TezEdgeDescriptor());
         }
         curTezOp = newTezOp;
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1540892&r1=1540891&r2=1540892&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue Nov 12 00:20:00 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
@@ -64,14 +65,17 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigIntWritableComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigLongWritableComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezPOPackageAnnotator.LoRearrangeDiscoverer;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -87,20 +91,17 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.mapreduce.combine.MRCombiner;
 import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 
 /**
  * A visitor to construct DAG out of Tez plan.
@@ -130,34 +131,82 @@ public class TezDagBuilder extends TezOp
             throw new VisitorException("Cannot create vertex for " + tezOp.name(), e);
         }
 
-        // Connect the new vertex with dependent vertices
+        // Connect the new vertex with predecessor vertices
         TezOperPlan tezPlan =  getPlan();
         List<TezOperator> predecessors = tezPlan.getPredecessors(tezOp);
         if (predecessors != null) {
             for (TezOperator predecessor : predecessors) {
-                // TODO: We should encapsulate edge properties in TezOperator.
-                // For now, we always create a shuffle edge.
-                EdgeProperty prop = new EdgeProperty(
-                        DataMovementType.SCATTER_GATHER,
-                        DataSourceType.PERSISTED,
-                        SchedulingType.SEQUENTIAL,
-                        new OutputDescriptor(OnFileSortedOutput.class.getName()),
-                        new InputDescriptor(ShuffledMergedInput.class.getName()));
-                // Since this is a dependency order walker, dependent vertices
+                // Since this is a dependency order walker, predecessor vertices
                 // must have already been created.
                 Vertex from = dag.getVertex(predecessor.name());
+                EdgeProperty prop = null;
+                try {
+                    prop = newEdge(predecessor, tezOp);
+                } catch (IOException e) {
+                    throw new VisitorException("Cannot create edge from " +
+                            predecessor.name() + " to " + tezOp.name(), e);
+                }
                 Edge edge = new Edge(from, to, prop);
                 dag.addEdge(edge);
             }
         }
     }
 
+    /**
+     * Return EdgeProperty that connects two vertices.
+     * @param from
+     * @param to
+     * @return EdgeProperty
+     * @throws IOException
+     */
+    private EdgeProperty newEdge(TezOperator from, TezOperator to) throws IOException {
+        TezEdgeDescriptor edge = to.inEdges.get(from.getOperatorKey());
+        PhysicalPlan combinePlan = edge.combinePlan;
+
+        InputDescriptor in = new InputDescriptor(edge.inputClassName);
+        OutputDescriptor out = new OutputDescriptor(edge.outputClassName);
+
+        if (!combinePlan.isEmpty()) {
+            Configuration conf = new Configuration();
+            addCombiner(combinePlan, conf);
+            in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+            out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+        }
+
+        return new EdgeProperty(
+                edge.dataMovementType,
+                edge.dataSourceType,
+                edge.schedulingType,
+                out, in);
+    }
+
+    private void addCombiner(PhysicalPlan combinePlan, Configuration conf) throws IOException {
+        POPackage combPack = (POPackage)combinePlan.getRoots().get(0);
+        setIntermediateInputKeyValue(combPack.getKeyType(), conf);
+
+        POLocalRearrange combRearrange = (POLocalRearrange)combinePlan.getLeaves().get(0);
+        setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf);
+
+        LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(combinePlan, combPack);
+        lrDiscoverer.visit();
+
+        combinePlan.remove(combPack);
+        conf.set(TezJobConfig.TEZ_RUNTIME_COMBINER_CLASS, MRCombiner.class.getName());
+        conf.set(MRJobConfig.COMBINE_CLASS_ATTR, PigCombiner.Combine.class.getName());
+        conf.setBoolean("mapred.mapper.new-api", true);
+        conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
+        conf.set("pig.combinePlan", ObjectSerializer.serialize(combinePlan));
+        conf.set("pig.combine.package", ObjectSerializer.serialize(combPack));
+        conf.set("pig.map.keytype", ObjectSerializer.serialize(new byte[] {combRearrange.getKeyType()}));
+    }
+
     private Vertex newVertex(TezOperator tezOp) throws IOException {
         ProcessorDescriptor procDesc = new ProcessorDescriptor(tezOp.getProcessorName());
 
         // Pass physical plans to vertex as user payload.
         Configuration conf = new Configuration();
         // We won't actually use this job, but we need it to talk with the Load Store funcs
+        @SuppressWarnings("deprecation")
         Job job = new Job(conf);
 
         ArrayList<POStore> storeLocations = new ArrayList<POStore>();
@@ -219,39 +268,33 @@ public class TezDagBuilder extends TezOp
             conf.set("pig.streaming.task.output.dir", tmpLocation.toString());
         }
 
-        if (!pc.inIllustrator)
-        {
-            // unset inputs for POStore, otherwise, exec plan will be unnecessarily deserialized
+        if (!pc.inIllustrator) {
+            // Unset inputs for POStore, otherwise, exec plan will be unnecessarily deserialized
             for (POStore st: stores) { st.setInputs(null); st.setParentPlan(null);}
             // We put them in the reduce because PigOutputCommitter checks the ID of the task to see if it's a map, and if not, calls the reduce committers.
             conf.set(JobControlCompiler.PIG_MAP_STORES, ObjectSerializer.serialize(new ArrayList<POStore>()));
             conf.set(JobControlCompiler.PIG_REDUCE_STORES, ObjectSerializer.serialize(stores));
         }
 
-        // Configure the classes for incoming shuffles to this TezOp
+        // For all shuffle outputs, configure the classes
         List<PhysicalOperator> leaves = tezOp.plan.getLeaves();
-        // TODO: Actually need to loop over leaves and set up per shuffle. Not sure how to give confs to an edge in Tez.
-        if (leaves.size() == 1 && leaves.get(0) instanceof POLocalRearrange){
-            byte keyType =  ((POLocalRearrange)leaves.get(0)).getKeyType();
-            Class<? extends WritableComparable> keyClass = HDataType.getWritableComparableTypes(keyType).getClass();
-            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, keyClass.getName());
-            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, NullableTuple.class.getName());
-            conf.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS, MRPartitioner.class.getName());
-            selectComparator(tezOp, keyType, conf);
+        // TODO: For multiple POLocalRearrange leaves, we need to loop over
+        // leaves and set up per shuffle. i.e. SPLIT + multiple GROUP BY with
+        // different keys.
+        if (leaves.size() == 1 && leaves.get(0) instanceof POLocalRearrange) {
+            byte keyType = ((POLocalRearrange)leaves.get(0)).getKeyType();
+            setIntermediateOutputKeyValue(keyType, conf);
         }
 
-        // For all shuffle outputs, configure the classes
+        // Configure the classes for incoming shuffles to this TezOp
         List<PhysicalOperator> roots = tezOp.plan.getRoots();
-        // TODO: Same as for the leaves, need to loop for multiple outputs.
-        if (roots.size() == 1 && roots.get(0) instanceof POPackage){
+        if (roots.size() == 1 && roots.get(0) instanceof POPackage) {
             POPackage pack = (POPackage) roots.get(0);
+            byte keyType = pack.getKeyType();
             tezOp.plan.remove(pack);
             conf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
-            conf.set("pig.reduce.key.type", Byte.toString(pack.getKeyType()));
-            Class<? extends WritableComparable> keyClass = HDataType.getWritableComparableTypes(pack.getKeyType()).getClass();
-            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, keyClass.getName());
-            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, NullableTuple.class.getName());
-            selectInputComparator(conf, pack.getKeyType());
+            conf.set("pig.reduce.key.type", Byte.toString(keyType));
+            setIntermediateInputKeyValue(keyType, conf);
             conf.setClass("pig.input.handler.class", ShuffledInputHandler.class, InputHandler.class);
         } else {
             conf.setClass("pig.input.handler.class", FileInputHandler.class, InputHandler.class);
@@ -259,10 +302,8 @@ public class TezDagBuilder extends TezOp
 
         conf.setClass("mapreduce.outputformat.class", PigOutputFormat.class, OutputFormat.class);
 
-        // Serialize the execution plans
+        // Serialize the execution plan
         conf.set(PigProcessor.PLAN, ObjectSerializer.serialize(tezOp.plan));
-        // TODO: The combiners need to be associated with the shuffle edges
-        conf.set(PigProcessor.COMBINE_PLAN, ObjectSerializer.serialize(tezOp.combinePlan));
         UDFContext.getUDFContext().serialize(conf);
 
         // Take our assembled configuration and create a vertex
@@ -336,7 +377,6 @@ public class TezDagBuilder extends TezOp
                 inpSignatureLists.add(ld.getSignature());
                 inpLimits.add(ld.getLimit());
                 //Remove the POLoad from the plan
-                //  if (!pigContext.inIllustrator)
                 tezOp.plan.remove(ld);
             }
         }
@@ -349,7 +389,24 @@ public class TezDagBuilder extends TezOp
         return (lds.size() > 0);
     }
 
-    private void selectInputComparator(Configuration conf, byte keyType) throws JobCreationException {
+    @SuppressWarnings("rawtypes")
+    private void setIntermediateInputKeyValue(byte keyType, Configuration conf) throws JobCreationException, ExecException {
+        Class<? extends WritableComparable> keyClass = HDataType.getWritableComparableTypes(keyType).getClass();
+        conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, keyClass.getName());
+        conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, NullableTuple.class.getName());
+        selectInputComparator(keyType, conf);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private void setIntermediateOutputKeyValue(byte keyType, Configuration conf) throws JobCreationException, ExecException {
+        Class<? extends WritableComparable> keyClass = HDataType.getWritableComparableTypes(keyType).getClass();
+        conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, keyClass.getName());
+        conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, NullableTuple.class.getName());
+        conf.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS, MRPartitioner.class.getName());
+        selectOutputComparator(keyType, conf);
+    }
+
+    private void selectInputComparator(byte keyType, Configuration conf) throws JobCreationException {
         //TODO: Handle sorting like in JobControlCompiler
 
         switch (keyType) {
@@ -414,10 +471,7 @@ public class TezDagBuilder extends TezOp
         }
     }
 
-    private void selectComparator(
-            TezOperator tezOp,
-            byte keyType,
-            Configuration conf) throws JobCreationException {
+    private void selectOutputComparator(byte keyType, Configuration conf) throws JobCreationException {
         //TODO: Handle sorting like in JobControlCompiler
 
         switch (keyType) {

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java?rev=1540892&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java Tue Nov 12 00:20:00 2013
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+
+/**
+ * Descriptor for Tez edge. It holds combine plan as well as edge properties.
+ */
+public class TezEdgeDescriptor {
+    // Combiner runs on both input and output of Tez edge.
+    public PhysicalPlan combinePlan;
+
+    public String inputClassName;
+    public String outputClassName;
+    public SchedulingType schedulingType;
+    public DataSourceType dataSourceType;
+    public DataMovementType dataMovementType;
+
+    public TezEdgeDescriptor() {
+        combinePlan = new PhysicalPlan();
+
+        // The default is shuffle edge.
+        inputClassName = ShuffledMergedInput.class.getName();
+        outputClassName = OnFileSortedOutput.class.getName();
+        schedulingType = SchedulingType.SEQUENTIAL;
+        dataSourceType = DataSourceType.PERSISTED;
+        dataMovementType = DataMovementType.SCATTER_GATHER;
+    }
+}

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1540892&r1=1540891&r2=1540892&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Tue Nov 12 00:20:00 2013
@@ -29,11 +29,13 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.BackendException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.Launcher;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.UDFContext;
@@ -45,51 +47,53 @@ import org.apache.tez.dag.api.TezConfigu
  * Main class that launches pig for Tez
  */
 public class TezLauncher extends Launcher {
-    
+
     private static final Log log = LogFactory.getLog(TezLauncher.class);
-    
+    private boolean aggregateWarning = false;
+
     @Override
     public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc) throws Exception {
+        aggregateWarning = Boolean.parseBoolean(pc.getProperties().getProperty("aggregate.warning", "false"));
         Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
 
         FileSystem fs = FileSystem.get(conf);
         Path stagingDir = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString());
-        
+
         TezResourceManager.initialize(stagingDir, pc, conf);
 
         conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
-        
+
         List<TezOperPlan> processedPlans = new ArrayList<TezOperPlan>();
-        
+
         TezStats tezStats = new TezStats(pc);
         PigStats.start(tezStats);
-        
+
         TezJobControlCompiler jcc = new TezJobControlCompiler(pc, conf);
         TezPlanContainer tezPlanContainer = compile(php, pc);
-        
+
         TezOperPlan tezPlan;
-        
+
         while ((tezPlan=tezPlanContainer.getNextPlan(processedPlans))!=null) {
             processedPlans.add(tezPlan);
 
             TezPOPackageAnnotator pkgAnnotator = new TezPOPackageAnnotator(tezPlan);
             pkgAnnotator.visit();
-    
+
             tezStats.initialize(tezPlan);
-    
+
             jc = jcc.compile(tezPlan, grpName, conf, tezPlanContainer);
             TezJobNotifier notifier = new TezJobNotifier(tezPlanContainer, tezPlan);
             ((TezJobControl)jc).setJobNotifier(notifier);
             ((TezJobControl)jc).setTezStats(tezStats);
-    
+
             // Initially, all jobs are in wait state.
             List<ControlledJob> jobsWithoutIds = jc.getWaitingJobList();
             log.info(jobsWithoutIds.size() + " tez job(s) waiting for submission.");
-    
+
             // TODO: MapReduceLauncher does a couple of things here. For example,
             // notify PPNL of job submission, update PigStas, etc. We will worry
             // about them later.
-    
+
             // Set the thread UDFContext so registered classes are available.
             final UDFContext udfContext = UDFContext.getUDFContext();
             Thread jcThread = new Thread(jc, "JobControl") {
@@ -99,11 +103,11 @@ public class TezLauncher extends Launche
                     super.run();
                 }
             };
-    
+
             JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
             jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
             jcThread.setContextClassLoader(PigContext.getClassLoader());
-    
+
             // Mark the times that the jobs were submitted so it's reflected in job
             // history props
             long scriptSubmittedTimestamp = System.currentTimeMillis();
@@ -115,7 +119,7 @@ public class TezLauncher extends Launche
                 jobConf.set("pig.job.submitted.timestamp",
                         Long.toString(System.currentTimeMillis()));
             }
-    
+
             // All the setup done, now lets launch the jobs. DAG is submitted to
             // YARN cluster by TezJob.submit().
             jcThread.start();
@@ -145,9 +149,18 @@ public class TezLauncher extends Launche
     public TezPlanContainer compile(PhysicalPlan php, PigContext pc)
             throws PlanException, IOException, VisitorException {
         TezCompiler comp = new TezCompiler(php, pc);
-        comp.compile();
-        TezOperPlan plan = comp.getTezPlan();
-        // TODO: Run optimizations here
+        TezOperPlan tezPlan = comp.compile();
+        Boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty(
+                PigConfiguration.PROP_NO_COMBINER, "false"));
+
+        // Run CombinerOptimizer on Tez plan
+        if (!pc.inIllustrator && !nocombiner)  {
+            boolean doMapAgg = Boolean.parseBoolean(pc.getProperties().getProperty(
+                    PigConfiguration.PROP_EXEC_MAP_PARTAGG, "false"));
+            CombinerOptimizer co = new CombinerOptimizer(tezPlan, doMapAgg);
+            co.visit();
+            co.getMessageCollector().logMessages(MessageType.Warning, aggregateWarning, log);
+        }
         return comp.getPlanContainer();
     }
 
@@ -179,4 +192,3 @@ public class TezLauncher extends Launche
         log.info("Cannot find job: " + jobID);
     }
 }
-

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1540892&r1=1540891&r2=1540892&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Tue Nov 12 00:20:00 2013
@@ -18,6 +18,7 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.io.ByteArrayOutputStream;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -26,6 +27,7 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 /**
@@ -35,8 +37,13 @@ import com.google.common.collect.Sets;
 public class TezOperator extends Operator<TezOpPlanVisitor> {
     private static final long serialVersionUID = 1L;
 
+    // Processor pipeline
     public PhysicalPlan plan;
-    public PhysicalPlan combinePlan;
+
+    // Descriptors for out-bound edges.
+    public Map<OperatorKey, TezEdgeDescriptor> outEdges;
+    // Descriptors for in-bound edges.
+    public Map<OperatorKey, TezEdgeDescriptor> inEdges;
 
     public Set<String> UDFs;
     public Set<PhysicalOperator> scalars;
@@ -76,7 +83,8 @@ public class TezOperator extends Operato
     public TezOperator(OperatorKey k) {
         super(k);
         plan = new PhysicalPlan();
-        combinePlan = new PhysicalPlan();
+        outEdges = Maps.newHashMap();
+        inEdges = Maps.newHashMap();
         UDFs = Sets.newHashSet();
         scalars = Sets.newHashSet();
     }
@@ -175,7 +183,7 @@ public class TezOperator extends Operato
         sb.delete(sb.length() - "\n".length(), sb.length());
         return sb.toString();
     }
-    
+
     public boolean needSegmentBelow() {
         return segmentBelow;
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java?rev=1540892&r1=1540891&r2=1540892&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java Tue Nov 12 00:20:00 2013
@@ -18,11 +18,13 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.io.PrintStream;
+import java.util.Map.Entry;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -49,23 +51,26 @@ public class TezPrinter extends TezOpPla
     @Override
     public void visitTezOp(TezOperator tezOper) throws VisitorException {
         mStream.println("Tez vertex " + tezOper.getOperatorKey().toString());
+        if (tezOper.inEdges.size() > 0) {
+            for (Entry<OperatorKey, TezEdgeDescriptor> inEdge : tezOper.inEdges.entrySet()) {
+                if (!inEdge.getValue().combinePlan.isEmpty()) {
+                    mStream.println("# Combine plan on edge <" + inEdge.getKey() + ">");
+                    PlanPrinter<PhysicalOperator, PhysicalPlan> printer =
+                            new PlanPrinter<PhysicalOperator, PhysicalPlan>(
+                                    inEdge.getValue().combinePlan, mStream);
+                    printer.setVerbose(isVerbose);
+                    printer.visit();
+                    mStream.println();
+                }
+            }
+        }
         if (tezOper.plan != null && tezOper.plan.size() > 0) {
+            mStream.println("# Plan on vertex");
             PlanPrinter<PhysicalOperator, PhysicalPlan> printer =
                     new PlanPrinter<PhysicalOperator, PhysicalPlan>(tezOper.plan, mStream);
             printer.setVerbose(isVerbose);
             printer.visit();
-        }
-        if (tezOper.combinePlan != null && tezOper.combinePlan.size() > 0) {
             mStream.println();
-            mStream.println("------------");
-            mStream.println("Combine Plan");
-            mStream.println("------------");
-            PlanPrinter<PhysicalOperator, PhysicalPlan> printer =
-                    new PlanPrinter<PhysicalOperator, PhysicalPlan>(tezOper.combinePlan, mStream);
-            printer.setVerbose(isVerbose);
-            printer.visit();
         }
-        mStream.println();
     }
 }
-