You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/02/05 05:43:12 UTC
svn commit: r1728600 - in /pig/branches/spark:
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/
src/org/apache/pig/backend/hadoop/executionengine/spa...
Author: xuefu
Date: Fri Feb 5 04:43:12 2016
New Revision: 1728600
URL: http://svn.apache.org/viewvc?rev=1728600&view=rev
Log:
PIG-4766: Ensure GroupBy is optimized for all algebraic Operations (Pallavi via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
pig/branches/spark/test/org/apache/pig/test/TestCombiner.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java?rev=1728600&r1=1728599&r2=1728600&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java Fri Feb 5 04:43:12 2016
@@ -25,6 +25,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.data.DataType;
import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
@@ -138,10 +139,23 @@ public class PORelationToExprProject ext
sendEmptyBagOnEOP = false;
return(r);
}
-
+
+ // See PIG-4644
@Override
public PORelationToExprProject clone() throws CloneNotSupportedException {
- return (PORelationToExprProject) super.clone();
+ ArrayList<Integer> cols = new ArrayList<>(columns.size());
+ // Can reuse the same Integer objects, as they are immutable
+ for (Integer i : columns) {
+ cols.add(i);
+ }
+ PORelationToExprProject clone = new PORelationToExprProject(new OperatorKey(mKey.scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
+ requestedParallelism, cols);
+ clone.cloneHelper(this);
+ clone.overloaded = overloaded;
+ clone.resultType = resultType;
+ clone.sendEmptyBagOnEOP = sendEmptyBagOnEOP;
+ return clone;
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java?rev=1728600&r1=1728599&r2=1728600&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java Fri Feb 5 04:43:12 2016
@@ -172,12 +172,17 @@ public class ReduceByConverter implement
@Override
public Tuple apply(Tuple v1, Tuple v2) {
LOG.debug("MergeValuesFunction in : " + v1 + " , " + v2);
- Tuple result = tf.newTuple();
+ Tuple result = tf.newTuple(2);
DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
Tuple t = new DefaultTuple();
try {
// Package the input tuples so they can be processed by Algebraic functions.
Object key = v1.get(0);
+ if (key == null) {
+ key = "";
+ } else {
+ result.set(0, key);
+ }
bag.add((Tuple) v1.get(1));
bag.add((Tuple) v2.get(1));
t.append(key);
@@ -197,14 +202,14 @@ public class ReduceByConverter implement
// But, we want the result to look like this:
// (ABC,((2),(3))) - A tuple with key and a value tuple (containing values).
// Hence, the construction of a new value tuple
- result.append(t.get(0));
+
Tuple valueTuple = tf.newTuple();
for (Object o : ((Tuple) r.result).getAll()) {
if (!o.equals(key)) {
valueTuple.append(o);
}
}
- result.append(valueTuple);
+ result.set(1,valueTuple);
LOG.debug("MergeValuesFunction out : " + result);
return result;
} catch (ExecException e) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java?rev=1728600&r1=1728599&r2=1728600&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java Fri Feb 5 04:43:12 2016
@@ -17,7 +17,6 @@
*/
package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -26,8 +25,8 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -54,7 +53,6 @@ import org.apache.pig.impl.plan.VisitorE
import org.apache.pig.impl.plan.optimizer.OptimizerException;
import org.apache.pig.impl.util.Pair;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
@@ -73,7 +71,7 @@ public class CombinerOptimizer extends S
public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
try {
addCombiner(sparkOp.physicalPlan);
- } catch (PlanException e) {
+ } catch (Exception e) {
throw new VisitorException(e);
}
}
@@ -91,7 +89,7 @@ public class CombinerOptimizer extends S
// -> localRearrange
// -> foreach (using algebraicOp.Initial)
// -> CombinerRearrange
- private void addCombiner(PhysicalPlan phyPlan) throws VisitorException, PlanException {
+ private void addCombiner(PhysicalPlan phyPlan) throws VisitorException, PlanException, CloneNotSupportedException {
List<PhysicalOperator> leaves = phyPlan.getLeaves();
if (leaves == null || leaves.size() != 1) {
@@ -120,6 +118,9 @@ public class CombinerOptimizer extends S
}
PhysicalOperator successor = poPackageSuccessors.get(0);
+ // Retaining the original successor to be used later in modifying the plan.
+ PhysicalOperator packageSuccessor = successor;
+
if (successor instanceof POLimit) {
// POLimit is acceptable, as long as it has a single foreach as
// successor
@@ -137,11 +138,14 @@ public class CombinerOptimizer extends S
if (foreachSuccessors == null || foreachSuccessors.size() != 1) {
continue;
}
- List<PhysicalPlan> feInners = foreach.getInputPlans();
+ // Clone foreach so it can be modified to a post-reduce foreach.
+ POForEach postReduceFE = foreach.clone();
+ List<PhysicalPlan> feInners = postReduceFE.getInputPlans();
// find algebraic operators and also check if the foreach statement
// is suitable for combiner use
- List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = findAlgebraicOps(feInners);
+ List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = CombinerOptimizerUtil.findAlgebraicOps
+ (feInners);
if (algebraicOps == null || algebraicOps.size() == 0) {
// the plan is not combinable or there is nothing to combine
// we're done
@@ -149,19 +153,18 @@ public class CombinerOptimizer extends S
}
try {
List<PhysicalOperator> glrPredecessors = phyPlan.getPredecessors(glr);
- if (glrPredecessors == null || glrPredecessors.isEmpty()) {
+ // Exclude co-group from optimization
+ if (glrPredecessors == null || glrPredecessors.size() != 1) {
continue;
}
if (!(glrPredecessors.get(0) instanceof POLocalRearrange)) {
continue;
}
- LOG.info("Algebraic operations found. Optimizing plan to use combiner.");
POLocalRearrange rearrange = (POLocalRearrange) glrPredecessors.get(0);
- PhysicalOperator foreachSuccessor = foreachSuccessors.get(0);
- // Clone foreach so it can be modified to an operation post-reduce.
- POForEach postReduceFE = foreach.clone();
+
+ LOG.info("Algebraic operations found. Optimizing plan to use combiner.");
// Trim the global rearrange and the preceeding package.
convertToMapSideForEach(phyPlan, poPackage);
@@ -183,7 +186,7 @@ public class CombinerOptimizer extends S
}
// create new map foreach -
- POForEach mfe = CombinerOptimizerUtil.createForEachWithGrpProj(foreach, poPackage.getPkgr()
+ POForEach mfe = CombinerOptimizerUtil.createForEachWithGrpProj(postReduceFE, poPackage.getPkgr()
.getKeyType());
Map<PhysicalOperator, Integer> op2newpos = Maps.newHashMap();
Integer pos = 1;
@@ -210,7 +213,7 @@ public class CombinerOptimizer extends S
}
// create new combine foreach
- POForEach cfe = CombinerOptimizerUtil.createForEachWithGrpProj(foreach, poPackage.getPkgr()
+ POForEach cfe = CombinerOptimizerUtil.createForEachWithGrpProj(postReduceFE, poPackage.getPkgr()
.getKeyType());
// add algebraic functions with appropriate projection
CombinerOptimizerUtil.addAlgebraicFuncToCombineFE(cfe, op2newpos);
@@ -250,25 +253,29 @@ public class CombinerOptimizer extends S
.getRequestedParallelism(),
cfe.getInputPlans(), cfe.getToBeFlattened(), combinePack.getPkgr());
reduceOperator.setCustomPartitioner(glr.getCustomPartitioner());
- fixReduceSideFE(postReduceFE, cfe);
+ fixReduceSideFE(postReduceFE, algebraicOps);
CombinerOptimizerUtil.changeFunc(reduceOperator, POUserFunc.INTERMEDIATE);
updatePackager(reduceOperator, newRearrange);
// Add the new operators
phyPlan.add(reduceOperator);
phyPlan.add(newRearrange);
- phyPlan.add(postReduceFE);
- // Reconnect as follows :
- // foreach (using algebraicOp.Final)
- // -> reduceBy (uses algebraicOp.Intermediate)
+ phyPlan.add(mfe);
+ // Connect the new operators as follows:
+ // reduceBy (using algebraicOp.Intermediate)
+ // -> rearrange
// -> foreach (using algebraicOp.Initial)
- phyPlan.disconnect(foreach, foreachSuccessor);
- phyPlan.connect(foreach, newRearrange);
+ phyPlan.connect(mfe, newRearrange);
phyPlan.connect(newRearrange, reduceOperator);
- phyPlan.connect(reduceOperator, postReduceFE);
- phyPlan.replace(foreach, mfe);
- phyPlan.connect(postReduceFE, foreachSuccessor);
+ // Insert the reduce stage between combiner rearrange and its successor.
+ phyPlan.disconnect(combinerLocalRearrange, packageSuccessor);
+ phyPlan.connect(reduceOperator, packageSuccessor);
+ phyPlan.connect(combinerLocalRearrange, mfe);
+
+ // Replace foreach with post reduce foreach
+ phyPlan.add(postReduceFE);
+ phyPlan.replace(foreach, postReduceFE);
} catch (Exception e) {
int errCode = 2018;
String msg = "Internal error. Unable to introduce the combiner for optimization.";
@@ -278,6 +285,30 @@ public class CombinerOptimizer extends S
}
}
+ // Modifies the input plans of the post reduce foreach to match the output of reduce stage.
+ private void fixReduceSideFE(POForEach postReduceFE, List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps)
+ throws ExecException, PlanException {
+ int i=1;
+ for (Pair<PhysicalOperator, PhysicalPlan> algebraicOp : algebraicOps) {
+ POUserFunc combineUdf = (POUserFunc) algebraicOp.first;
+ PhysicalPlan pplan = algebraicOp.second;
+ combineUdf.setAlgebraicFunction(POUserFunc.FINAL);
+
+ POProject newProj = new POProject(
+ CombinerOptimizerUtil.createOperatorKey(postReduceFE.getOperatorKey().getScope()),
+ 1, i
+ );
+ newProj.setResultType(DataType.BAG);
+
+ PhysicalOperator udfInput = pplan.getPredecessors(combineUdf).get(0);
+ pplan.disconnect(udfInput, combineUdf);
+ pplan.add(newProj);
+ pplan.connect(newProj, combineUdf);
+ i++;
+ }
+ postReduceFE.setResultType(DataType.TUPLE);
+ }
+
// Modifies the map side of foreach (before reduce).
private void convertToMapSideForEach(PhysicalPlan physicalPlan, POPackage poPackage)
throws PlanException {
@@ -296,21 +327,6 @@ public class CombinerOptimizer extends S
physicalPlan.removeAndReconnect(poPackage);
}
-
- // TODO: Modify the post reduce plan in case of nested algebraic(ExpressionOperator) or logical operations.
- private void fixReduceSideFE(POForEach postReduceFE, POForEach cfe) throws PlanException,
- CloneNotSupportedException {
- List<PhysicalPlan> plans = cfe.getInputPlans();
- List<PhysicalPlan> newPlans = new ArrayList<>();
- for (int i = 0; i < plans.size(); i++) {
- PhysicalPlan inputPlan = plans.get(i).clone();
- newPlans.add(inputPlan);
- }
- postReduceFE.setInputPlans(newPlans);
- CombinerOptimizerUtil.changeFunc(postReduceFE, POUserFunc.FINAL);
- postReduceFE.setResultType(DataType.TUPLE);
- }
-
// Update the ReduceBy Operator with the packaging used by Local rearrange.
private void updatePackager(POReduceBySpark reduceOperator, POLocalRearrange lrearrange) throws OptimizerException {
Packager pkgr = reduceOperator.getPkgr();
@@ -318,7 +334,7 @@ public class CombinerOptimizer extends S
// update the keyInfo information if already present in the POPackage
Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = pkgr.getKeyInfo();
if (keyInfo == null)
- keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+ keyInfo = new HashMap<>();
if (keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
// something is wrong - we should not be getting key info
@@ -339,82 +355,6 @@ public class CombinerOptimizer extends S
}
/**
- * find algebraic operators and also check if the foreach statement is
- * suitable for combiner use
- *
- * @param feInners inner plans of foreach
- * @return null if plan is not combinable, otherwise list of combinable operators
- * @throws VisitorException
- */
- // TODO : Since all combinable cases are not handled, not using the utility method in CombinerOptimizerUtil
- private static List<Pair<PhysicalOperator, PhysicalPlan>> findAlgebraicOps(List<PhysicalPlan> feInners)
- throws VisitorException {
- List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = Lists.newArrayList();
-
- // check each foreach inner plan
- for (PhysicalPlan pplan : feInners) {
- // check for presence of non combinable operators
- CombinerOptimizerUtil.AlgebraicPlanChecker algChecker = new CombinerOptimizerUtil.AlgebraicPlanChecker
- (pplan);
- algChecker.visit();
- if (algChecker.sawNonAlgebraic) {
- return null;
- }
-
- // TODO : Distinct is combinable. Handle it.
- if (algChecker.sawDistinctAgg) {
- return null;
- }
-
- List<PhysicalOperator> roots = pplan.getRoots();
- // combinable operators have to be attached to POProject root(s)
- for (PhysicalOperator root : roots) {
- if (root instanceof ConstantExpression) {
- continue;
- }
- if (!(root instanceof POProject)) {
- // how can this happen? - expect root of inner plan to be
- // constant or project. not combining it
- return null;
- }
- POProject proj = (POProject) root;
- POUserFunc combineUdf = getAlgebraicSuccessor(pplan);
- if (combineUdf == null) {
- if (proj.isProjectToEnd()) {
- // project-star or project to end
- // not combinable
- return null;
- }
- // Check to see if this is a projection of the grouping column.
- // If so, it will be a projection of col 0
- List<Integer> cols = proj.getColumns();
- if (cols != null && cols.size() == 1 && cols.get(0) == 0) {
- // it is project of grouping column, so the plan is
- // still combinable
- continue;
- } else {
- //not combinable
- return null;
- }
- }
-
- // The algebraic udf can have more than one input. Add the udf only once
- boolean exist = false;
- for (Pair<PhysicalOperator, PhysicalPlan> pair : algebraicOps) {
- if (pair.first.equals(combineUdf)) {
- exist = true;
- break;
- }
- }
- if (!exist)
- algebraicOps.add(new Pair<PhysicalOperator, PhysicalPlan>(combineUdf, pplan));
- }
- }
-
- return algebraicOps;
- }
-
- /**
* Look for a algebraic POUserFunc that is the leaf of an input plan.
*
* @param pplan physical plan
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java?rev=1728600&r1=1728599&r2=1728600&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java Fri Feb 5 04:43:12 2016
@@ -316,7 +316,7 @@ public class CombinerOptimizerUtil {
* @return null if plan is not combinable, otherwise list of combinable operators
* @throws VisitorException
*/
- private static List<Pair<PhysicalOperator, PhysicalPlan>> findAlgebraicOps(List<PhysicalPlan> feInners)
+ public static List<Pair<PhysicalOperator, PhysicalPlan>> findAlgebraicOps(List<PhysicalPlan> feInners)
throws VisitorException {
List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = Lists.newArrayList();
@@ -570,7 +570,7 @@ public class CombinerOptimizerUtil {
return pclr;
}
- private static OperatorKey createOperatorKey(String scope) {
+ public static OperatorKey createOperatorKey(String scope) {
return new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope));
}
@@ -663,13 +663,13 @@ public class CombinerOptimizerUtil {
* Checks if there is something that prevents the use of algebraic interface,
* and looks for the PODistinct that can be used as algebraic
*/
- public static class AlgebraicPlanChecker extends PhyPlanVisitor {
- public boolean sawNonAlgebraic = false;
- public boolean sawDistinctAgg = false;
+ private static class AlgebraicPlanChecker extends PhyPlanVisitor {
+ boolean sawNonAlgebraic = false;
+ boolean sawDistinctAgg = false;
private boolean sawForeach = false;
private PODistinct distinct = null;
- public AlgebraicPlanChecker(PhysicalPlan plan) {
+ AlgebraicPlanChecker(PhysicalPlan plan) {
super(plan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan));
}
Modified: pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java?rev=1728600&r1=1728599&r2=1728600&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java (original)
+++ pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java Fri Feb 5 04:43:12 2016
@@ -63,7 +63,7 @@ public class TestLocationInPhysicalPlan
if (Util.getLocalTestMode().toString().equals("TEZ_LOCAL")) {
Assert.assertEquals("A[1,4],A[3,4],B[2,4]", jStats.getAliasLocation());
} else if (Util.getLocalTestMode().toString().equals("SPARK_LOCAL")) {
- Assert.assertEquals("A[1,4],B[2,4],A[3,4]", jStats.getAliasLocation());
+ Assert.assertEquals("A[1,4],A[3,4],B[2,4],A[3,4]", jStats.getAliasLocation());
} else {
Assert.assertEquals("M: A[1,4],A[3,4],B[2,4] C: A[3,4],B[2,4] R: A[3,4]", jStats.getAliasLocation());
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCombiner.java?rev=1728600&r1=1728599&r2=1728600&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCombiner.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCombiner.java Fri Feb 5 04:43:12 2016
@@ -412,7 +412,13 @@ public class TestCombiner {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
pigServer.explain(variable, ps);
- boolean combinerFound = baos.toString().matches("(?si).*combine plan.*");
+ boolean combinerFound;
+ if (pigServer.getPigContext().getExecType().name().equalsIgnoreCase("spark")) {
+ combinerFound = baos.toString().contains("Reduce By");
+ } else {
+ combinerFound = baos.toString().matches("(?si).*combine plan.*");
+ }
+
System.out.println(baos.toString());
assertEquals("is combiner present as expected", combineExpected, combinerFound);
}