You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/12/19 04:58:20 UTC
svn commit: r1050757 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
test/org/apache/pig/test/
Author: thejas
Date: Sun Dec 19 03:58:20 2010
New Revision: 1050757
URL: http://svn.apache.org/viewvc?rev=1050757&view=rev
Log:
PIG-750: Use combiner when algebraic UDFs are used in expressions
PIG-490: Combiner not used when group elements referred to in
tuple notation instead of flatten.
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
pig/trunk/test/org/apache/pig/test/TestCombiner.java
pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1050757&r1=1050756&r2=1050757&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Dec 19 03:58:20 2010
@@ -24,6 +24,11 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-750: Use combiner when algebraic UDFs are used in expressions (thejas)
+
+PIG-490: Combiner not used when group elements referred to in
+ tuple notation instead of flatten. (thejas)
+
PIG-1768: 09 docs: illustrate (changec via olgan)
PIG-1768: docs reorg (changec via olgan)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=1050757&r1=1050756&r2=1050757&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Sun Dec 19 03:58:20 2010
@@ -18,7 +18,9 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,6 +33,7 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
@@ -54,46 +57,29 @@ import org.apache.pig.impl.plan.PlanWalk
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.impl.util.Pair;
/**
* Optimize map reduce plans to use the combiner where possible.
- * Currently Foreach is copied to the combiner phase if it does not contain a
- * nested plan and all UDFs in the generate statement are algebraic.
- * The version of the foreach in the combiner
- * stage will use the initial function, and the version in the reduce stage
- * will be changed to use the final function.
- *
+ * Algebriac functions and distinct in nested plan of a foreach are partially
+ * computed in the map and combine phase.
+ * A new foreach statement with initial and intermediate forms of algebraic
+ * functions are added to map and combine plans respectively.
+ *
+ * If bag portion of group-by result is projected or a non algebraic
+ * expression/udf has bag as input, combiner will not be used. This is because
+ * the use of combiner in such case is likely to degrade performance
+ * as there will not be much reduction in data size in combine stage to
+ * offset the cost of the additional number of times (de)serialization is done.
+ *
+ *
* Major areas for enhancement:
- * 1) Currently, scripts such as:
- * B = group A by $0;
- * C = foreach B {
- * C1 = distinct A;
- * generate group, COUNT(C1);
- * }
- * do not use the combiner. The issue is being able to properly decompose
- * the expression in the UDF's plan. The current code just takes whatever is
- * the argument to the algebraic UDF and replaces it with a project. This
- * works for things like generate group, SUM(A.$1 + 1). But it fails for
- * things like the above. Certain types of inner plans will never be
- * movable (like filters). But distinct or order by in the inner plan
- * should be mobile. And, things like:
- * C = cogroup A by $0, B by $0;
- * D = foreach C {
- * D1 = distinct A;
- * D2 = distinct B;
- * generate UDF(D1 + D2);
- * }
- * make it even harder. The first step is probably just to handle queries
- * like the first above, as they will probably be the most common.
+ * 1. use of combiner in cogroup
+ * 2. queries with order-by, limit or sort in a nested foreach after group-by
+ * 3. case where group-by is followed by filter that has algebraic expression
*
- * 2) Scripts such as:
- * B = group A by $0;
- * C = foreach B generate algebraic(A), nonalgebraic(A);
- * currently aren't moved into the combiner, even though they could be.
- * Again, the trick here is properly decomposing the plan since A may be more
- * than a simply projection.
+ *
*
- * #2 should probably be the next area of focus.
*
*/
public class CombinerOptimizer extends MROpPlanVisitor {
@@ -102,18 +88,7 @@ public class CombinerOptimizer extends M
private Log log = LogFactory.getLog(getClass());
- private enum ExprType { SIMPLE_PROJECT, ALGEBRAIC, NOT_ALGEBRAIC,
- DISTINCT };
-
- private int mKeyField = -1;
-
- // This array tracks the positions of the group key in the output tuples
- // of the foreach clause. This needs to be revisited when combiner optimizer
- // supports foreach output with parts of group key (e.g. group.$0).
- private boolean[] keyFieldPositions;
- private byte mKeyType = 0;
-
private CompilationMessageCollector messageCollector = null;
public CombinerOptimizer(MROperPlan plan, String chunkSize) {
@@ -122,21 +97,21 @@ public class CombinerOptimizer extends M
}
public CombinerOptimizer(MROperPlan plan, String chunkSize,
- CompilationMessageCollector messageCollector) {
+ CompilationMessageCollector messageCollector) {
super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
this.messageCollector = messageCollector ;
}
-
+
public CompilationMessageCollector getMessageCollector() {
- return messageCollector;
+ return messageCollector;
}
@Override
public void visitMROp(MapReduceOper mr) throws VisitorException {
- resetState();
log.trace("Entering CombinerOptimizer.visitMROp");
if (mr.reducePlan.isEmpty()) return;
+ // part one - check if this MR job represents a group-by + foreach
// Find the POLocalRearrange in the map. I'll need it later.
List<PhysicalOperator> mapLeaves = mr.mapPlan.getLeaves();
if (mapLeaves == null || mapLeaves.size() != 1) {
@@ -151,7 +126,7 @@ public class CombinerOptimizer extends M
List<PhysicalOperator> reduceRoots = mr.reducePlan.getRoots();
if (reduceRoots.size() != 1) {
- messageCollector.collect("Expected reduce to have single leaf", MessageType.Warning, PigWarning.MULTI_LEAF_REDUCE);
+ messageCollector.collect("Expected reduce to have single leaf", MessageType.Warning, PigWarning.MULTI_LEAF_REDUCE);
return;
}
@@ -159,7 +134,7 @@ public class CombinerOptimizer extends M
// not, I don't know what's going on, so I'm out of here.
PhysicalOperator root = reduceRoots.get(0);
if (!(root instanceof POPackage)) {
- messageCollector.collect("Expected reduce root to be a POPackage", MessageType.Warning, PigWarning.NON_PACKAGE_REDUCE_PLAN_ROOT);
+ messageCollector.collect("Expected reduce root to be a POPackage", MessageType.Warning, PigWarning.NON_PACKAGE_REDUCE_PLAN_ROOT);
return;
}
POPackage pack = (POPackage)root;
@@ -169,7 +144,6 @@ public class CombinerOptimizer extends M
if (packSuccessors == null || packSuccessors.size() != 1) return;
PhysicalOperator successor = packSuccessors.get(0);
- // Need to check if this is a distinct.
if (successor instanceof POFilter) {
/*
Later
@@ -184,128 +158,384 @@ public class CombinerOptimizer extends M
} else if (algebraic(filterInner)) {
// TODO Duplicate filter to combiner
}
- */
+ */
} else if (successor instanceof POForEach) {
POForEach foreach = (POForEach)successor;
List<PhysicalPlan> feInners = foreach.getInputPlans();
- List<ExprType> ap = algebraic(feInners, foreach.getToBeFlattened());
- if (ap != null) {
- log.info("Choosing to move algebraic foreach to combiner");
-
- // Need to insert two new foreachs - one in the combine
- // and one in the map plan which will be based on the reduce foreach.
- // The map foreach will have one inner plan for each
- // inner plan in the foreach we're duplicating. For
- // projections, the plan will be the same. For algebraic
- // udfs, the plan will have the initial version of the function.
-
- // The combine foreach will have one inner plan for each
- // inner plan in the foreach we're duplicating. For
- // projections, the project operators will be changed to
- // project the same column as its position in the
- // foreach. For algebraic udfs, the plan will have the
- // intermediate version of the function. The input to the
- // udf will be a POProject which will project the column
- // corresponding to the position of the udf in the foreach
-
- // In the inner plans of the reduce foreach for
- // projections, the project operators will be changed to
- // project the same column as its position in the
- // foreach. For algebraic udfs, the plan will have the
- // final version of the function. The input to the
- // udf will be a POProject which will project the column
- // corresponding to the position of the udf in the foreach
- if (mr.combinePlan.getRoots().size() != 0) {
- messageCollector.collect("Wasn't expecting to find anything already "
+
+ // find algebraic operators and also check if the foreach statement
+ // is suitable for combiner use
+ List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps =
+ findAlgebraicOps(feInners);
+ if(algebraicOps == null || algebraicOps.size() == 0){
+ // the plan is not combinable or there is nothing to combine
+ //we're done
+ return;
+ }
+ if (mr.combinePlan.getRoots().size() != 0) {
+ messageCollector.collect("Wasn't expecting to find anything already "
+ "in the combiner!", MessageType.Warning, PigWarning.NON_EMPTY_COMBINE_PLAN);
- return;
+ return;
+ }
+
+ log.info("Choosing to move algebraic foreach to combiner");
+
+ try {
+
+
+ // replace PODistinct->Project[*] with distinct udf (which is Algebriac)
+ for(Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps ){
+ if(! (op2plan.first instanceof PODistinct))
+ continue;
+ DistinctPatcher distinctPatcher = new DistinctPatcher(op2plan.second);
+ distinctPatcher.visit();
+ if(distinctPatcher.getDistinct() == null){
+ int errCode = 2073;
+ String msg = "Problem with replacing distinct operator with distinct built-in function.";
+ throw new PlanException(msg, errCode, PigException.BUG);
+ }
+ op2plan.first = distinctPatcher.getDistinct();
}
- mr.combinePlan = new PhysicalPlan();
- try {
- // If we haven't already found the key (and thus the
- // key type) we need to figure out the key type now.
- if (mKeyType == 0) {
- mKeyType = rearrange.getKeyType();
+
+ //create new map foreach
+ POForEach mfe = createForEachWithGrpProj(foreach, rearrange.getKeyType());
+ Map<PhysicalOperator, Integer> op2newpos =
+ new HashMap<PhysicalOperator, Integer>();
+ Integer pos = 1;
+ //create plan for each algebraic udf and add as inner plan in map-foreach
+ for(Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps ){
+ PhysicalPlan udfPlan = createPlanWithPredecessors(op2plan.first, op2plan.second);
+ mfe.addInputPlan(udfPlan, false);
+ op2newpos.put(op2plan.first, pos++);
+ }
+ changeFunc(mfe, POUserFunc.INITIAL);
+
+ // since we will only be creating SingleTupleBag as input to
+ // the map foreach, we should flag the POProjects in the map
+ // foreach inner plans to also use SingleTupleBag
+ for (PhysicalPlan mpl : mfe.getInputPlans()) {
+ try {
+ new fixMapProjects(mpl).visit();
+ } catch (VisitorException e) {
+ int errCode = 2089;
+ String msg = "Unable to flag project operator to use single tuple bag.";
+ throw new PlanException(msg, errCode, PigException.BUG, e);
}
+ }
+
+ //create new combine foreach
+ POForEach cfe = createForEachWithGrpProj(foreach, rearrange.getKeyType());
+ //add algebraic functions with appropriate projection
+ addAlgebraicFuncToCombineFE(cfe, op2newpos);
+ changeFunc(cfe, POUserFunc.INTERMEDIATE);
+
+ //fix projection and function time for algebraic functions in reduce foreach
+ for(Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps ){
+ setProjectInput(op2plan.first, op2plan.second, op2newpos.get(op2plan.first));
+ ((POUserFunc)op2plan.first).setAlgebraicFunction(POUserFunc.FINAL);
+ }
+
+
+ // we have modified the foreach inner plans - so set them
+ // again for the foreach so that foreach can do any re-initialization
+ // around them.
+ // FIXME - this is a necessary evil right now because the leaves are explicitly
+ // stored in the POForeach as a list rather than computed each time at
+ // run time from the plans for optimization. Do we want to have the Foreach
+ // compute the leaves each time and have Java optimize it (will Java optimize?)?
+ mfe.setInputPlans(mfe.getInputPlans());
+ cfe.setInputPlans(cfe.getInputPlans());
+ foreach.setInputPlans(foreach.getInputPlans());
+
+ //tell POCombinerPackage which fields need projected and
+ // which placed in bags. First field is simple project
+ // rest need to go into bags
+ int numFields = algebraicOps.size() + 1; // algebraic funcs + group key
+ boolean[] bags = new boolean[numFields];
+ bags[0] = false;
+ for (int i = 1; i < numFields; i++) {
+ bags[i] = true;
+ }
+
+ // Use the POCombiner package in the combine plan
+ // as it needs to act differently than the regular
+ // package operator.
+ mr.combinePlan = new PhysicalPlan();
+ POCombinerPackage combinePack =
+ new POCombinerPackage(pack, bags);
+ mr.combinePlan.add(combinePack);
+ mr.combinePlan.add(cfe);
+ mr.combinePlan.connect(combinePack, cfe);
+ // No need to connect projections in cfe to cp, because
+ // PigCombiner directly attaches output from package to
+ // root of remaining plan.
+
+ POLocalRearrange mlr = getNewRearrange(rearrange);
+
+
+ // A specialized local rearrange operator will replace
+ // the normal local rearrange in the map plan. This behaves
+ // like the regular local rearrange in the getNext()
+ // as far as getting its input and constructing the
+ // "key" out of the input. It then returns a tuple with
+ // two fields - the key in the first position and the
+ // "value" inside a bag in the second position. This output
+ // format resembles the format out of a Package. This output
+ // will feed to the map foreach which expects this format.
+ // If the key field isn't in the project of the combiner or map foreach,
+ // it is added to the end (This is required so that we can
+ // set up the inner plan of the new Local Rearrange leaf in the map
+ // and combine plan to contain just the project of the key).
+ patchUpMap(mr.mapPlan, getPreCombinerLR(rearrange), mfe, mlr);
+ POLocalRearrange clr = getNewRearrange(rearrange);
+
+ mr.combinePlan.add(clr);
+ mr.combinePlan.connect(cfe, clr);
+
+ // Change the package operator in the reduce plan to
+ // be the POCombiner package, as it needs to act
+ // differently than the regular package operator.
+ POCombinerPackage newReducePack =
+ new POCombinerPackage(pack, bags);
+ mr.reducePlan.replace(pack, newReducePack);
+
+ // the replace() above only changes
+ // the plan and does not change "inputs" to
+ // operators
+ // set up "inputs" for the operator after
+ // package correctly
+ List<PhysicalOperator> packList = new ArrayList<PhysicalOperator>();
+ packList.add(newReducePack);
+ List<PhysicalOperator> sucs = mr.reducePlan.getSuccessors(newReducePack);
+ // there should be only one successor to package
+ sucs.get(0).setInputs(packList);
+ } catch (Exception e) {
+ int errCode = 2018;
+ String msg = "Internal error. Unable to introduce the combiner for optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+ }
+
+
+ /**
+ * find algebraic operators and also check if the foreach statement
+ * is suitable for combiner use
+ * @param feInners inner plans of foreach
+ * @return null if plan is not combinable, otherwise list of combinable operators
+ * @throws VisitorException
+ */
+ private List<Pair<PhysicalOperator, PhysicalPlan>>
+ findAlgebraicOps(List<PhysicalPlan> feInners)
+ throws VisitorException {
+ ArrayList<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = new ArrayList<Pair<PhysicalOperator, PhysicalPlan>>();
+
+ //check each foreach inner plan
+ for(PhysicalPlan pplan : feInners){
+ //check for presence of non combinable operators
+ AlgebraicPlanChecker algChecker = new AlgebraicPlanChecker(pplan);
+ algChecker.visit();
+ if(algChecker.sawNonAlgebraic){
+ return null;
+ }
+
+ //if we found a combinable distinct add that to list
+ if(algChecker.sawDistinctAgg){
+ algebraicOps.add(new Pair<PhysicalOperator, PhysicalPlan>(algChecker.getDistinct(), pplan));
+ continue;
+ }
+
- POForEach mfe = foreach.clone();
- POForEach cfe = foreach.clone();
- fixUpForeachs(mfe, cfe, foreach, ap);
-
-
- // Use the ExprType list returned from algebraic to tell
- // POCombinerPackage which fields need projected and
- // which placed in bags.
- int numFields = (mKeyField >= ap.size()) ? mKeyField + 1 :
- ap.size();
- boolean[] bags = new boolean[numFields];
- for (int i = 0; i < ap.size(); i++) {
- if (ap.get(i) == ExprType.SIMPLE_PROJECT) bags[i] = false;
- else bags[i] = true;
+ List<PhysicalOperator> roots = pplan.getRoots();
+ //combinable operators have to be attached to POProject root(s)
+ // if root does not have a successor that is combinable, the project
+ // has to be projecting the group column . Otherwise this MR job
+ //is considered not combinable as we don't want to use combiner for
+ // cases where this foreach statement is projecting bags (likely to
+ // bad for performance because of additional (de)serialization costs)
+
+ for(PhysicalOperator root : roots){
+ if(root instanceof ConstantExpression){
+ continue;
+ }
+ if(! (root instanceof POProject)){
+ // how can this happen? - expect root of inner plan to be
+ // constant or project. not combining it
+ //TODO: Warn
+ return null;
+ }
+ POProject proj = (POProject)root;
+ POUserFunc combineUdf = getAlgebraicSuccessor(proj, pplan);
+ if(combineUdf == null){
+ // Check to see if this is a projection of the grouping column.
+ // If so, it will be a projection of col 0
+ List<Integer> cols = proj.getColumns();
+ if (cols != null && cols.size() == 1 && cols.get(0) == 0) {
+ //it is project of grouping column, so the plan is still
+ //combinable
+ continue;
+ }else{
+ //not combinable
+ return null;
}
- bags[mKeyField] = false;
- // Use the POCombiner package in the combine plan
- // as it needs to act differently than the regular
- // package operator.
- POCombinerPackage combinePack =
- new POCombinerPackage(pack, bags, keyFieldPositions);
- mr.combinePlan.add(combinePack);
- mr.combinePlan.add(cfe);
- mr.combinePlan.connect(combinePack, cfe);
- // No need to connect projections in cfe to cp, because
- // PigCombiner directly attaches output from package to
- // root of remaining plan.
-
- POLocalRearrange mlr = rearrange.clone();
- fixUpRearrange(mlr);
-
- // A specialized local rearrange operator will replace
- // the normal local rearrange in the map plan. This behaves
- // like the regular local rearrange in the getNext()
- // as far as getting its input and constructing the
- // "key" out of the input. It then returns a tuple with
- // two fields - the key in the first position and the
- // "value" inside a bag in the second position. This output
- // format resembles the format out of a Package. This output
- // will feed to the map foreach which expects this format.
- // If the key field isn't in the project of the combiner or map foreach,
- // it is added to the end (This is required so that we can
- // set up the inner plan of the new Local Rearrange leaf in the map
- // and combine plan to contain just the project of the key).
- patchUpMap(mr.mapPlan, getPreCombinerLR(rearrange), mfe, mlr);
- POLocalRearrange clr = rearrange.clone();
- fixUpRearrange(clr);
-
- mr.combinePlan.add(clr);
- mr.combinePlan.connect(cfe, clr);
-
- // Change the package operator in the reduce plan to
- // be the POCombiner package, as it needs to act
- // differently than the regular package operator.
- POCombinerPackage newReducePack =
- new POCombinerPackage(pack, bags, keyFieldPositions);
- mr.reducePlan.replace(pack, newReducePack);
-
- // the replace() above only changes
- // the plan and does not change "inputs" to
- // operators
- // set up "inputs" for the operator after
- // package correctly
- List<PhysicalOperator> packList = new ArrayList<PhysicalOperator>();
- packList.add(newReducePack);
- List<PhysicalOperator> sucs = mr.reducePlan.getSuccessors(newReducePack);
- // there should be only one successor to package
- sucs.get(0).setInputs(packList);
- } catch (Exception e) {
- int errCode = 2018;
- String msg = "Internal error. Unable to introduce the combiner for optimization.";
- throw new OptimizerException(msg, errCode, PigException.BUG, e);
}
+
+ algebraicOps.add(new Pair<PhysicalOperator, PhysicalPlan>(combineUdf, pplan));
+
}
}
+
+ return algebraicOps;
}
/**
+ * Look for a algebraic POUserFunc as successor to this project, called
+ * recursively to skip any other projects seen on the way.
+ * @param proj project
+ * @param pplan physical plan
+ * @return null if any operator other POProject or algebraic POUserFunc is
+ * found while going down the plan, otherwise algebraic POUserFunc is returned
+ */
+ private POUserFunc getAlgebraicSuccessor(POProject proj, PhysicalPlan pplan) {
+ //check if root is followed by combinable operator
+ List<PhysicalOperator> succs = pplan.getSuccessors(proj);
+ if(succs == null || succs.size() == 0){
+ return null;
+ }
+ if(succs.size() > 1){
+ //project shared by more than one operator - does not happen
+ // in plans generated today
+ // won't try to combine this
+ return null;
+ }
+
+
+ PhysicalOperator succ = succs.get(0);
+ if(succ instanceof POProject){
+ return getAlgebraicSuccessor((POProject) succ, pplan);
+ }
+
+ if(succ instanceof POUserFunc && ((POUserFunc)succ).combinable() ){
+ return (POUserFunc)succ;
+ }
+
+ //some other operator ? can't combine
+ return null;
+ }
+
+
+ /**
+ * Create a new foreach with same scope,alias as given foreach
+ * add an inner plan that projects the group column, which is going to be
+ * the first input
+ * @param foreach source foreach
+ * @param keyType type for group-by key
+ * @return new POForeach
+ */
+ private POForEach createForEachWithGrpProj(POForEach foreach, byte keyType) {
+ String scope = foreach.getOperatorKey().scope;
+ POForEach newFE = new POForEach(createOperatorKey(scope), new ArrayList<PhysicalPlan>());
+ newFE.setAlias(foreach.getAlias());
+ newFE.setResultType(foreach.getResultType());
+ //create plan that projects the group column
+ PhysicalPlan grpProjPlan = new PhysicalPlan();
+ //group by column is the first column
+ POProject proj = new POProject(createOperatorKey(scope), 1, 0);
+ proj.setResultType(keyType);
+ grpProjPlan.add(proj);
+
+ newFE.addInputPlan(grpProjPlan, false);
+ return newFE;
+ }
+
+ /**
+ * Create new plan and add to it the clones of operator algeOp and its
+ * predecessors from the physical plan pplan .
+ * @param algeOp algebraic operator
+ * @param pplan physical plan that has algeOp
+ * @return new plan
+ * @throws CloneNotSupportedException
+ * @throws PlanException
+ */
+ private PhysicalPlan createPlanWithPredecessors(PhysicalOperator algeOp, PhysicalPlan pplan)
+ throws CloneNotSupportedException, PlanException {
+ PhysicalPlan newplan = new PhysicalPlan();
+ addPredecessorsToPlan(algeOp, pplan, newplan);
+ return newplan;
+ }
+
+ /**
+ * Recursively clone op and its predecessors from pplan and add them to newplan
+ * @param op
+ * @param pplan
+ * @param newplan
+ * @return
+ * @throws CloneNotSupportedException
+ * @throws PlanException
+ */
+ private PhysicalOperator addPredecessorsToPlan(PhysicalOperator op, PhysicalPlan pplan,
+ PhysicalPlan newplan)
+ throws CloneNotSupportedException, PlanException {
+ PhysicalOperator newOp = op.clone();
+ newplan.add(newOp);
+ if(pplan.getPredecessors(op) == null || pplan.getPredecessors(op).size() == 0){
+ return newOp;
+ }
+ for(PhysicalOperator pred : pplan.getPredecessors(op)){
+ PhysicalOperator newPred = addPredecessorsToPlan(pred, pplan, newplan);
+ newplan.connect(newPred, newOp);
+ }
+ return newOp;
+ }
+
+
+
+
+ /**
+ * add algebraic functions with appropriate projection to new foreach in combiner
+ * @param cfe - the new foreach in combiner
+ * @param op2newpos - mapping of physical operator to position in input
+ * @throws CloneNotSupportedException
+ * @throws PlanException
+ */
+ private void addAlgebraicFuncToCombineFE(POForEach cfe, Map<PhysicalOperator, Integer> op2newpos)
+ throws CloneNotSupportedException, PlanException {
+
+ //an array that we will first populate with physical operators in order
+ //of their position in input. Used while adding plans to combine foreach
+ // just so that output of combine foreach same positions as input. That
+ // means the same operator to position mapping can be used by reduce as well
+ PhysicalOperator[] opsInOrder = new PhysicalOperator[op2newpos.size() + 1];
+ for(Map.Entry<PhysicalOperator, Integer> op2pos : op2newpos.entrySet()){
+ opsInOrder[op2pos.getValue()] = op2pos.getKey();
+ }
+
+ // first position is used by group column and a plan has been added for it,
+ //so start with 1
+ for(int i=1; i < opsInOrder.length; i++){
+ //create new inner plan for foreach
+ //add cloned copy of given physical operator and a new project.
+ // Even if the udf in query takes multiple input, only one project
+ // needs to be added because input to this udf
+ //will be the INITIAL version of udf evaluated in map.
+ PhysicalPlan newPlan = new PhysicalPlan();
+ PhysicalOperator newOp = opsInOrder[i].clone();
+ newPlan.add(newOp);
+ POProject proj = new POProject(
+ createOperatorKey(cfe.getOperatorKey().getScope()),
+ 1, i
+ );
+ proj.setResultType(DataType.BAG);
+ newPlan.add(proj);
+ newPlan.connect(proj, newOp);
+ cfe.addInputPlan(newPlan, false);
+ }
+ }
+
+ /**
+ * Replace old POLocalRearrange with new pre-combine LR,
+ * add new map foreach, new map-local-rearrange, and connect them
+ *
* @param mapPlan
* @param preCombinerLR
* @param mfe
@@ -314,13 +544,13 @@ public class CombinerOptimizer extends M
*/
private void patchUpMap(PhysicalPlan mapPlan, POPreCombinerLocalRearrange preCombinerLR,
POForEach mfe, POLocalRearrange mlr) throws PlanException {
-
+
POLocalRearrange oldLR = (POLocalRearrange)mapPlan.getLeaves().get(0);
mapPlan.replace(oldLR, preCombinerLR);
-
+
mapPlan.add(mfe);
mapPlan.connect(preCombinerLR, mfe);
-
+
mapPlan.add(mlr);
mapPlan.connect(mfe, mlr);
}
@@ -330,305 +560,19 @@ public class CombinerOptimizer extends M
* @return
*/
private POPreCombinerLocalRearrange getPreCombinerLR(POLocalRearrange rearrange) {
-
+
String scope = rearrange.getOperatorKey().scope;
POPreCombinerLocalRearrange pclr = new POPreCombinerLocalRearrange(
- new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)),
+ createOperatorKey(scope),
rearrange.getRequestedParallelism(), rearrange.getInputs());
pclr.setPlans(rearrange.getPlans());
return pclr;
}
- /*
- private boolean onKeysOnly(PhysicalPlan pp) {
- // TODO
- return false;
- }
- */
-
- // At some point in the future we can expand to deconstructing
- // non-algebraic expressions to find an algebraic or projected root. For
- // example, given a query:
- // foreach b generate group, algebraic(a), nonalgebraic(a)
- // this could be transformed to:
- // combiner: foreach group, initial(a), a
- // reducer: foreach group, final(a), nonalgebraic(a)
- // This code doesn't do this now, because deconstructing expressions is
- // tricky.
- private List<ExprType> algebraic(
- List<PhysicalPlan> plans,
- List<Boolean> flattens) throws VisitorException {
- List<ExprType> types = new ArrayList<ExprType>(plans.size());
- boolean atLeastOneAlgebraic = false;
- boolean noNonAlgebraics = true;
- keyFieldPositions = new boolean[plans.size()];
- for (int i = 0; i < plans.size(); i++) {
- ExprType t = algebraic(plans.get(i), flattens.get(i), i);
- types.add(t);
- atLeastOneAlgebraic |=
- (t == ExprType.ALGEBRAIC || t == ExprType.DISTINCT);
- noNonAlgebraics &= (t != ExprType.NOT_ALGEBRAIC);
- }
- if (!atLeastOneAlgebraic || !noNonAlgebraics) return null;
- else return types;
- }
-
- private ExprType algebraic(
- PhysicalPlan pp,
- Boolean toBeFlattened,
- int field) throws VisitorException {
- // A plan will be considered algebraic if
- // each element is a single field OR an algebraic UDF
- List<PhysicalOperator> leaves = pp.getLeaves();
- if (leaves == null || leaves.size() != 1) {
- // Don't know what this is, but it isn't algebraic
- return ExprType.NOT_ALGEBRAIC;
- }
-
- // Check that it doesn't have anything in the nested plan that I
- // can't make algebraic. At this point this is just filters and
- // foreach. Filters are left out because they are not necessarily
- // algebraic. Foreach is left out because it's difficult to patch
- // up the plan properly around them. This is an area for future
- // enhancement.
- AlgebraicPlanChecker apc = new AlgebraicPlanChecker(pp);
- apc.visit();
- if (apc.sawNonAlgebraic) return ExprType.NOT_ALGEBRAIC;
- if(apc.sawDistinctAgg) return ExprType.DISTINCT;
-
- // we did not see a Non algebraic or a distinct so far
- // proceed to check leaves
- PhysicalOperator leaf = leaves.get(0);
- if (leaf instanceof POProject) {
- POProject proj = (POProject)leaf;
- // Check that it's a simple project. We can't currently handle
- // things like group.$0, because that requires resetting types on
- // the reduce side.
- if (pp.getPredecessors(proj) != null) return ExprType.NOT_ALGEBRAIC;
-
- // Check if it's a projection of bag. Currently we can't use combiner
- // for statement like c = foreach b generate group, SUM(a), a;
- // where a is a bag.
- if (proj.getResultType() == DataType.BAG) return ExprType.NOT_ALGEBRAIC;
-
- // Check to see if this is a projection of the grouping column.
- // If so, it will be a projection of col 0 and will have no
- // predecessors (to avoid things like group.$0, which isn't what we
- // want).
- List<Integer> cols = proj.getColumns();
- if (cols != null && cols.size() == 1 && cols.get(0) == 0 &&
- pp.getPredecessors(proj) == null) {
- mKeyField = field;
- keyFieldPositions[field] = true;
- mKeyType = proj.getResultType();
- } else {
- // It can't be a flatten except on the grouping column
- if (toBeFlattened) return ExprType.NOT_ALGEBRAIC;
- }
- return ExprType.SIMPLE_PROJECT;
- } else if (leaf instanceof POUserFunc) {
-
- POUserFunc userFunc = (POUserFunc)leaf;
- if(!userFunc.combinable() ){
- return ExprType.NOT_ALGEBRAIC;
- }
- // The leaf userFunc may be combinable, but there might be other
- // algebraic userFuncs in the predecessors, if there are
- // we choose not to fire combiner.
- CheckCombinableUserFunc ccuf = new CheckCombinableUserFunc(pp);
- ccuf.visit();
- return ccuf.exprType;
- } else {
- return ExprType.NOT_ALGEBRAIC;
- }
- }
-
- private static class CheckCombinableUserFunc extends PhyPlanVisitor{
-
- private ExprType exprType = ExprType.ALGEBRAIC;
-
- public CheckCombinableUserFunc(PhysicalPlan plan) {
- super(plan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan));
- }
-
- @Override
- public void visit() throws VisitorException {
- super.visit();
- }
-
- @Override
- public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
-
- /* We already know there is one combinable POUserFunc and its a leaf. So,
- * successor of that userFunc is null. We are interested to find
- * if there is another combinable userFunc somewhere in plan (that
- * is a userFunc with successors and is Combinable).
- */
- List<PhysicalOperator> succs = this.mPlan.getSuccessors(userFunc);
-
- if(succs != null && !succs.isEmpty() && userFunc.combinable()){
- this.exprType = ExprType.NOT_ALGEBRAIC;
- }
- }
- }
-
- // Returns number of fields that this will project, including the added
- // key field if that is necessary
- private void fixUpForeachs(
- POForEach mfe, // map foreach
- POForEach cfe, // combiner foreach
- POForEach rfe, // reducer foreach
- List<ExprType> exprs) throws PlanException {
- List<PhysicalPlan> mPlans = mfe.getInputPlans();
- List<PhysicalPlan> cPlans = cfe.getInputPlans();
- List<PhysicalPlan> rPlans = rfe.getInputPlans();
- for (int i = 0; i < exprs.size(); i++) {
- if (exprs.get(i) == ExprType.ALGEBRAIC) {
- changeFunc(mfe, mPlans.get(i), POUserFunc.INITIAL);
- changeFunc(cfe, cPlans.get(i), POUserFunc.INTERMEDIATE);
- changeFunc(rfe, rPlans.get(i), POUserFunc.FINAL);
- } else if (exprs.get(i) == ExprType.DISTINCT) {
- // A PODistinct in the plan will always have
- // a Project[bag](*) as its successor.
- // We will replace it with a POUserFunc with "Distinct" as
- // the underlying UDF.
- // In the map and combine, we will make this POUserFunc
- // the leaf of the plan by removing other operators which
- // are descendants up to the leaf.
- // In the reduce we will keep descendants intact. Further
- // down in fixProjectAndInputs we will change the inputs to
- // this POUserFunc in the combine and reduce plans to be
- // just projections of the column "i"
- PhysicalPlan[] plans = new PhysicalPlan[] {
- mPlans.get(i), cPlans.get(i), rPlans.get(i) };
- byte[] funcTypes = new byte[] { POUserFunc.INITIAL,
- POUserFunc.INTERMEDIATE, POUserFunc.FINAL };
- for (int j = 0; j < plans.length; j++) {
- DistinctPatcher dp = new DistinctPatcher(plans[j]);
- try {
- dp.visit();
- } catch (VisitorException e) {
- int errCode = 2073;
- String msg = "Problem with replacing distinct operator with distinct built-in function.";
- throw new PlanException(msg, errCode, PigException.BUG, e);
- }
-
-
- PhysicalOperator leaf = plans[j].getLeaves().get(0);
- // make the Distinct POUserFunc the leaf in the map and combine plans.
- if( j != plans.length - 1) {
- while(!((leaf instanceof POUserFunc) &&
- ((POUserFunc)leaf).getFuncSpec().getClassName().startsWith(DISTINCT_UDF_CLASSNAME))) {
- plans[j].remove(leaf);
- // get the new leaf
- leaf = plans[j].getLeaves().get(0);
- }
-
- }
- // Also set the Distinct's function to type Initial in map
- // to type Intermediate in combine plan and to type Final in
- // the reduce
- POUserFunc distinctFunc = (POUserFunc)getDistinctUserFunc(plans[j], leaf);
- try {
- distinctFunc.setAlgebraicFunction(funcTypes[j]);
- } catch (ExecException e) {
- int errCode = 2074;
- String msg = "Could not configure distinct's algebraic functions in map reduce plan.";
- throw new PlanException(msg, errCode, PigException.BUG, e);
- }
- }
-
- }
- }
-
- // since we will only be creating SingleTupleBag as input to
- // the map foreach, we should flag the POProjects in the map
- // foreach inner plans to also use SingleTupleBag
- for (PhysicalPlan mpl : mPlans) {
- try {
- new fixMapProjects(mpl).visit();
- } catch (VisitorException e) {
- int errCode = 2089;
- String msg = "Unable to flag project operator to use single tuple bag.";
- throw new PlanException(msg, errCode, PigException.BUG, e);
- }
- }
-
-
- // Set flattens for map and combiner ForEach to false
- List<Boolean> feFlattens = new ArrayList<Boolean>(cPlans.size());
- for (int i = 0; i < cPlans.size(); i++) {
- feFlattens.add(false);
- }
- mfe.setToBeFlattened(feFlattens);
- cfe.setToBeFlattened(feFlattens);
-
- // If the key field isn't in the project of the combiner or map foreach, add
- // it to the end (This is required so that we can set up the inner plan
- // of the new Local Rearrange in the map and combine plan to contain just the
- // project of the key).
- if (mKeyField == -1) {
- addKeyProject(mfe);
- addKeyProject(cfe);
- mKeyField = cPlans.size() - 1;
- keyFieldPositions = new boolean[cPlans.size()];
- keyFieldPositions[mKeyField] = true;
- }
-
- // Change the plans on the reduce/combine foreach to project from the column
- // they are in ( we just want to take output from the combine and
- // use that as input in the reduce/combine plan). UDFs will be left the same but their
- // inputs altered. Any straight projections will also be altered.
- fixProjectAndInputs(cPlans, exprs);
- fixProjectAndInputs(rPlans, exprs);
-
-
- // we have modified the foreach inner plans - so set them
- // again for the foreach so that foreach can do any re-initialization
- // around them.
- // FIXME - this is a necessary evil right now because the leaves are explicitly
- // stored in the POForeach as a list rather than computed each time at
- // run time from the plans for optimization. Do we want to have the Foreach
- // compute the leaves each time and have Java optimize it (will Java optimize?)?
- mfe.setInputPlans(mPlans);
- cfe.setInputPlans(cPlans);
- rfe.setInputPlans(rPlans);
+ private OperatorKey createOperatorKey(String scope) {
+ return new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope));
}
- /**
- * @param plans
- * @param exprs
- * @throws PlanException
- */
- private void fixProjectAndInputs(List<PhysicalPlan> plans, List<ExprType> exprs) throws PlanException {
- for (int i = 0; i < plans.size(); i++) {
- List<PhysicalOperator> leaves = plans.get(i).getLeaves();
- if (leaves == null || leaves.size() != 1) {
- int errCode = 2019;
- String msg = "Expected to find plan with single leaf. Found " + leaves.size() + " leaves.";
- throw new PlanException(msg, errCode, PigException.BUG);
- }
- PhysicalOperator leaf = leaves.get(0);
- // the combine plan could have an extra foreach inner plan
- // to project the key - so make sure we check the index
- // before looking in exprs
- if(i < exprs.size() && exprs.get(i) == ExprType.DISTINCT) {
- // if there is a distinctagg, we have to
- // look for the Distinct POUserFunc and
- // change its input to be a project of
- // column "i"
- PhysicalOperator op = getDistinctUserFunc(plans.get(i), leaf);
- setProjectInput(op, plans.get(i), i);
- } else {
- // Leaf should be either a projection or a UDF
- if (leaf instanceof POProject) {
- ((POProject)leaf).setColumn(i);
- } else if (leaf instanceof POUserFunc) {
- setProjectInput(leaf, plans.get(i), i);
- }
- }
- }
- }
/**
* @param op
@@ -639,8 +583,8 @@ public class CombinerOptimizer extends M
private void setProjectInput(PhysicalOperator op, PhysicalPlan plan, int index) throws PlanException {
String scope = op.getOperatorKey().scope;
POProject proj = new POProject(new OperatorKey(scope,
- NodeIdGenerator.getGenerator().getNextNodeId(scope)),
- op.getRequestedParallelism(), index);
+ NodeIdGenerator.getGenerator().getNextNodeId(scope)),
+ op.getRequestedParallelism(), index);
proj.setResultType(DataType.BAG);
// Remove old connections and elements from the plan
plan.trimAbove(op);
@@ -650,80 +594,86 @@ public class CombinerOptimizer extends M
new ArrayList<PhysicalOperator>(1);
inputs.add(proj);
op.setInputs(inputs);
-
+
}
/**
- * @param plan
- * @param operator
- * @return
+ * Change the algebriac function type for algebraic functions in map and combine
+ * In map and combine the algebraic functions will be leaf of the plan
+ * @param fe
+ * @param type
+ * @throws PlanException
*/
- private PhysicalOperator getDistinctUserFunc(PhysicalPlan plan, PhysicalOperator operator) {
- if(operator instanceof POUserFunc ) {
- if(((POUserFunc)operator).getFuncSpec().getClassName().startsWith(DISTINCT_UDF_CLASSNAME)) {
- return operator;
+ private void changeFunc(POForEach fe, byte type) throws PlanException {
+ for(PhysicalPlan plan : fe.getInputPlans()){
+ List<PhysicalOperator> leaves = plan.getLeaves();
+ if (leaves == null || leaves.size() != 1) {
+ int errCode = 2019;
+ String msg = "Expected to find plan with single leaf. Found " + leaves.size() + " leaves.";
+ throw new PlanException(msg, errCode, PigException.BUG);
+ }
+
+ PhysicalOperator leaf = leaves.get(0);
+ if(leaf instanceof POProject){
+ continue;
+ }
+ if (!(leaf instanceof POUserFunc)) {
+ int errCode = 2020;
+ String msg = "Expected to find plan with UDF or project leaf. Found " + leaf.getClass().getSimpleName();
+ throw new PlanException(msg, errCode, PigException.BUG);
+ }
+
+ POUserFunc func = (POUserFunc)leaf;
+ try {
+ func.setAlgebraicFunction(type);
+ } catch (ExecException e) {
+ int errCode = 2075;
+ String msg = "Could not set algebraic function type.";
+ throw new PlanException(msg, errCode, PigException.BUG, e);
}
}
- return getDistinctUserFunc(plan, plan.getPredecessors(operator).get(0));
-
}
/**
- * @param fe
+ * create new Local rearrange by cloning existing rearrange and
+ * add plan for projecting the key
+ * @param rearrange
+ * @return
+ * @throws PlanException
+ * @throws CloneNotSupportedException
*/
- private void addKeyProject(POForEach fe) {
- PhysicalPlan newForEachInnerPlan = new PhysicalPlan();
- String scope = fe.getOperatorKey().scope;
- POProject proj = new POProject(new OperatorKey(scope,
- NodeIdGenerator.getGenerator().getNextNodeId(scope)), -1, 0);
- proj.setResultType(mKeyType);
- newForEachInnerPlan.add(proj);
- fe.addInputPlan(newForEachInnerPlan, false);
- }
-
- private void changeFunc(POForEach fe, PhysicalPlan plan, byte type) throws PlanException {
- List<PhysicalOperator> leaves = plan.getLeaves();
- if (leaves == null || leaves.size() != 1) {
- int errCode = 2019;
- String msg = "Expected to find plan with single leaf. Found " + leaves.size() + " leaves.";
- throw new PlanException(msg, errCode, PigException.BUG);
- }
-
- PhysicalOperator leaf = leaves.get(0);
- if (!(leaf instanceof POUserFunc)) {
- int errCode = 2020;
- String msg = "Expected to find plan with UDF leaf. Found " + leaf.getClass().getSimpleName();
- throw new PlanException(msg, errCode, PigException.BUG);
- }
- POUserFunc func = (POUserFunc)leaf;
- try {
- func.setAlgebraicFunction(type);
- } catch (ExecException e) {
- int errCode = 2075;
- String msg = "Could not set algebraic function type.";
- throw new PlanException(msg, errCode, PigException.BUG, e);
- }
- }
-
- private void fixUpRearrange(POLocalRearrange rearrange) throws PlanException {
+ private POLocalRearrange getNewRearrange(POLocalRearrange rearrange)
+ throws PlanException, CloneNotSupportedException {
+
+ POLocalRearrange newRearrange = rearrange.clone();
+
// Set the projection to be the key
PhysicalPlan newPlan = new PhysicalPlan();
- String scope = rearrange.getOperatorKey().scope;
+ String scope = newRearrange.getOperatorKey().scope;
POProject proj = new POProject(new OperatorKey(scope,
- NodeIdGenerator.getGenerator().getNextNodeId(scope)), -1,
- mKeyField);
- proj.setResultType(mKeyType);
+ NodeIdGenerator.getGenerator().getNextNodeId(scope)), -1, 0);
+ proj.setResultType(newRearrange.getKeyType());
newPlan.add(proj);
+
List<PhysicalPlan> plans = new ArrayList<PhysicalPlan>(1);
plans.add(newPlan);
- rearrange.setPlansFromCombiner(plans);
+ newRearrange.setPlansFromCombiner(plans);
+
+ return newRearrange;
}
+ /**
+ * Checks if there is something that prevents the use of algebraic interface,
+ * and looks for the PODistinct that can be used as algebraic
+ *
+ */
private static class AlgebraicPlanChecker extends PhyPlanVisitor {
boolean sawNonAlgebraic = false;
boolean sawDistinctAgg = false;
private boolean sawForeach = false;
+ private PODistinct distinct = null;
+
AlgebraicPlanChecker(PhysicalPlan plan) {
super(plan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan));
@@ -741,9 +691,10 @@ public class CombinerOptimizer extends M
sawNonAlgebraic = true;
}
}
-
+
@Override
public void visitDistinct(PODistinct distinct) throws VisitorException {
+ this.distinct = distinct;
if(sawDistinctAgg) {
// we want to combine only in the case where there is only
// one PODistinct which is the only input to an agg
@@ -787,7 +738,7 @@ public class CombinerOptimizer extends M
PhysicalOperator leaf = mPlan.getLeaves().get(0);
// the leaf has to be a POUserFunc (need not be algebraic)
if(leaf instanceof POUserFunc) {
-
+
// we want to combine only in the case where there is only
// one PODistinct which is the only input to an agg.
// Do not combine if there are additional inputs.
@@ -796,7 +747,7 @@ public class CombinerOptimizer extends M
sawNonAlgebraic = true;
return;
}
-
+
List<PhysicalOperator> immediateSuccs = mPlan.getSuccessors(distinct);
if(immediateSuccs.size() == 1 && immediateSuccs.get(0) instanceof POProject) {
if(checkSuccessorIsLeaf(leaf, immediateSuccs.get(0))) { // script 1 above
@@ -813,7 +764,7 @@ public class CombinerOptimizer extends M
}
}
}
-
+
}
}
}
@@ -821,7 +772,17 @@ public class CombinerOptimizer extends M
// the pattern we expected
sawNonAlgebraic = true;
}
-
+
+ /**
+ * @return the distinct
+ */
+ public PODistinct getDistinct() {
+ if(sawNonAlgebraic)
+ return null;
+ return distinct;
+ }
+
+ @Override
public void visitLimit(POLimit limit) throws VisitorException {
sawNonAlgebraic = true;
}
@@ -836,7 +797,7 @@ public class CombinerOptimizer extends M
}
return false;
}
-
+
@Override
public void visitFilter(POFilter filter) throws VisitorException {
sawNonAlgebraic = true;
@@ -857,7 +818,7 @@ public class CombinerOptimizer extends M
}
}
-
+
/**
* A visitor to replace
* Project[bag][*]
@@ -868,7 +829,7 @@ public class CombinerOptimizer extends M
*/
private static class DistinctPatcher extends PhyPlanVisitor {
- public boolean patched = false;
+ private POUserFunc distinct = null;
/**
* @param plan
* @param walker
@@ -884,7 +845,7 @@ public class CombinerOptimizer extends M
public DistinctPatcher(PhysicalPlan physicalPlan) {
this(physicalPlan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(physicalPlan));
}
-
+
/* (non-Javadoc)
* @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject)
*/
@@ -892,13 +853,13 @@ public class CombinerOptimizer extends M
public void visitProject(POProject proj) throws VisitorException {
// check if this project is preceded by PODistinct and
// has the return type bag
-
-
+
+
List<PhysicalOperator> preds = mPlan.getPredecessors(proj);
if(preds == null) return; // this is a leaf project and so not interesting for patching
PhysicalOperator pred = preds.get(0);
if(preds.size() == 1 && pred instanceof PODistinct) {
- if(patched) {
+ if(distinct != null) {
// we should not already have been patched since the
// Project-Distinct pair should occur only once
int errCode = 2076;
@@ -908,7 +869,9 @@ public class CombinerOptimizer extends M
// we have stick in the POUserfunc(org.apache.pig.builtin.Distinct)[DataBag]
// in place of the Project-PODistinct pair
PhysicalOperator distinctPredecessor = mPlan.getPredecessors(pred).get(0);
-
+
+ POUserFunc func = null;
+
try {
String scope = proj.getOperatorKey().scope;
List<PhysicalOperator> funcInput = new ArrayList<PhysicalOperator>();
@@ -919,7 +882,7 @@ public class CombinerOptimizer extends M
// originally a POForeach with return type BAG - we need to
// set it to tuple so we get a stream of tuples.
distinctPredecessor.setResultType(DataType.TUPLE);
- POUserFunc func = new POUserFunc(new OperatorKey(scope,
+ func = new POUserFunc(new OperatorKey(scope,
NodeIdGenerator.getGenerator().getNextNodeId(scope)),-1, funcInput, fSpec);
func.setResultType(DataType.BAG);
mPlan.replace(proj, func);
@@ -932,12 +895,17 @@ public class CombinerOptimizer extends M
String msg = "Problem with reconfiguring plan to add distinct built-in function.";
throw new OptimizerException(msg, errCode, PigException.BUG, e);
}
- patched = true;
+ distinct = func;
}
}
+ POUserFunc getDistinct(){
+ return distinct;
+ }
+
+
}
-
+
private static class fixMapProjects extends PhyPlanVisitor {
public fixMapProjects(PhysicalPlan plan) {
@@ -979,11 +947,4 @@ public class CombinerOptimizer extends M
}
- // Reset any member variables since we may have already visited one
- // combine.
- private void resetState() {
- mKeyField = -1;
- mKeyType = 0;
- keyFieldPositions = null;
- }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java?rev=1050757&r1=1050756&r2=1050757&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java Sun Dec 19 03:58:20 2010
@@ -18,15 +18,12 @@
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
@@ -38,8 +35,8 @@ import org.apache.pig.data.NonSpillableD
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.NullableTuple;
-import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.Pair;
/**
@@ -63,8 +60,6 @@ public class POCombinerPackage extends P
private boolean[] mBags; // For each field, indicates whether or not it
// needs to be put in a bag.
- private boolean[] keyPositions;
-
private Map<Integer, Integer> keyLookup;
private int numBags;
@@ -75,10 +70,8 @@ public class POCombinerPackage extends P
* @param pkg POPackage to clone.
* @param bags for each field, indicates whether it should be a bag (true)
* or a simple field (false).
- * @param keyPos for each field in the output tuple of the foreach operator,
- * indicates whether it's the group key.
*/
- public POCombinerPackage(POPackage pkg, boolean[] bags, boolean[] keyPos) {
+ public POCombinerPackage(POPackage pkg, boolean[] bags) {
super(new OperatorKey(pkg.getOperatorKey().scope,
NodeIdGenerator.getGenerator().getNextNodeId(pkg.getOperatorKey().scope)),
pkg.getRequestedParallelism(), pkg.getInputs());
@@ -96,9 +89,6 @@ public class POCombinerPackage extends P
for (int i = 0; i < mBags.length; i++) {
if (mBags[i]) numBags++;
}
- if (keyPos != null) {
- keyPositions = Arrays.copyOf(keyPos, keyPos.length);
- }
}
@Override
@@ -189,10 +179,5 @@ public class POCombinerPackage extends P
return r;
}
-
- @Override
- public boolean[] getKeyPositionsInTuple() {
- return keyPositions.clone();
- }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1050757&r1=1050756&r2=1050757&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Sun Dec 19 03:58:20 2010
@@ -67,13 +67,7 @@ public class POPackage extends PhysicalO
*/
private static final long serialVersionUID = 1L;
- private static boolean[] SIMPLE_KEY_POSITION;
- static {
- SIMPLE_KEY_POSITION = new boolean[1];
- SIMPLE_KEY_POSITION[0] = true;
- }
-
public static enum PackageType { GROUP, JOIN };
//The iterator of indexed Tuples
@@ -383,18 +377,7 @@ public class POPackage extends PhysicalO
public void setKeyType(byte keyType) {
this.keyType = keyType;
}
-
- /**
- * Get the field positions of key in the output tuples.
- * For POPackage, the position is always 0. The POCombinerPackage,
- * however, can return different values.
- *
- * @return the field position of key in the output tuples.
- */
- public boolean[] getKeyPositionsInTuple() {
- return SIMPLE_KEY_POSITION.clone();
- }
-
+
/**
* Make a deep copy of this operator.
* @throws CloneNotSupportedException
Modified: pig/trunk/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCombiner.java?rev=1050757&r1=1050756&r2=1050757&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCombiner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestCombiner.java Sun Dec 19 03:58:20 2010
@@ -181,7 +181,9 @@ public class TestCombiner extends TestCa
PigServer pigServer = new PigServer(ExecType.MAPREDUCE, props);
pigServer.registerQuery("a = load 'MultiCombinerUseInput.txt' as (x:int);");
pigServer.registerQuery("b = group a all;");
- pigServer.registerQuery("c = foreach b generate COUNT(a), SUM(a.$0), MIN(a.$0), MAX(a.$0), AVG(a.$0);");
+ pigServer.registerQuery("c = foreach b generate COUNT(a), SUM(a.$0), " +
+ "MIN(a.$0), MAX(a.$0), AVG(a.$0), ((double)SUM(a.$0))/COUNT(a.$0)," +
+ " COUNT(a.$0) + SUM(a.$0) + MAX(a.$0);");
// make sure there is a combine plan in the explain output
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -196,6 +198,9 @@ public class TestCombiner extends TestCa
assertEquals(0, t.get(2));
assertEquals(1, t.get(3));
assertEquals(0.5, t.get(4));
+ assertEquals(0.5, t.get(5));
+ assertEquals(512000L + 256000L + 1, t.get(6));
+
assertFalse(it.hasNext());
Util.deleteFile(cluster, "MultiCombinerUseInput.txt");
}
@@ -246,6 +251,79 @@ public class TestCombiner extends TestCa
Util.deleteFile(cluster, "distinctAggs1Input.txt");
}
+
+ @Test
+ public void testGroupElements() throws Exception {
+ // test use of combiner when group elements are accessed in the foreach
+ String input[] = {
+ "ABC\t1\ta\t1",
+ "ABC\t1\tb\t2",
+ "ABC\t1\ta\t3",
+ "ABC\t2\tb\t4",
+ "DEF\t1\td\t1",
+ "XYZ\t1\tx\t2"
+ };
+
+ Util.createInputFile(cluster, "testGroupElements.txt", input);
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer.registerQuery("a = load 'testGroupElements.txt' as (str:chararray, num1:int, alph : chararray, num2 : int);");
+ pigServer.registerQuery("b = group a by (str, num1);");
+
+ //check if combiner is present or not for various forms of foreach
+ pigServer.registerQuery("c = foreach b generate flatten(group), COUNT(a.alph), SUM(a.num2); ");
+ checkCombinerUsed(pigServer, "c", true);
+
+ pigServer.registerQuery("c = foreach b generate group, COUNT(a.alph), SUM(a.num2); ");
+ checkCombinerUsed(pigServer, "c", true);
+
+ // projecting bag - combiner should not be used
+ pigServer.registerQuery("c = foreach b generate group, a, COUNT(a.alph), SUM(a.num2); ");
+ checkCombinerUsed(pigServer, "c", false);
+
+ // projecting bag - combiner should not be used
+ pigServer.registerQuery("c = foreach b generate group, a.num2, COUNT(a.alph), SUM(a.num2); ");
+ checkCombinerUsed(pigServer, "c", false);
+
+ pigServer.registerQuery("c = foreach b generate group.$0, group.$1, COUNT(a.alph), SUM(a.num2); ");
+ checkCombinerUsed(pigServer, "c", true);
+
+ pigServer.registerQuery("c = foreach b generate group.$0, group.$1 + COUNT(a.alph), SUM(a.num2); ");
+ checkCombinerUsed(pigServer, "c", true);
+
+ pigServer.registerQuery("c = foreach b generate group.str, group.$1, COUNT(a.alph), SUM(a.num2); ");
+ checkCombinerUsed(pigServer, "c", true);
+
+ pigServer.registerQuery("c = foreach b generate group.str, group.$1, COUNT(a.alph), SUM(a.num2), " +
+ " (group.num1 == 1 ? (COUNT(a.num2) + 1) : (SUM(a.num2) + 10)) ; ");
+ checkCombinerUsed(pigServer, "c", true);
+
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "('ABC',1,3L,6L,4L)",
+ "('ABC',2,1L,4L,14L)",
+ "('DEF',1,1L,1L,2L)",
+ "('XYZ',1,1L,2L,2L)",
+ });
+
+ Iterator<Tuple> it = pigServer.openIterator("c");
+ Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+ Util.deleteFile(cluster, "distinctAggs1Input.txt");
+
+ }
+
+ private void checkCombinerUsed(PigServer pigServer, String string, boolean combineExpected)
+ throws IOException {
+ // make sure there is a combine plan in the explain output
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ pigServer.explain("c", ps);
+ boolean combinerFound = baos.toString().matches("(?si).*combine plan.*");
+ System.out.println(baos.toString());
+ assertEquals("is combiner present as expected", combineExpected, combinerFound);
+ }
+
@Test
public void testDistinctNoCombiner() throws Exception {
Modified: pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java?rev=1050757&r1=1050756&r2=1050757&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java Sun Dec 19 03:58:20 2010
@@ -431,7 +431,7 @@ public class TestMultiQueryCompiler {
myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
myPig.registerQuery("store c2 into '/tmp/output2';");
myPig.registerQuery("d1 = group d by gid;");
- myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
+ myPig.registerQuery("d2 = foreach d1 generate group, d.uname, MAX(d.uid) - MIN(d.uid);");
myPig.registerQuery("store d2 into '/tmp/output3';");
LogicalPlan lp = checkLogicalPlan(1, 3, 19);