You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/02/10 22:23:24 UTC
svn commit: r1729740 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/
test/org/apache/pig/tez/
Author: rohini
Date: Wed Feb 10 21:23:24 2016
New Revision: 1729740
URL: http://svn.apache.org/viewvc?rev=1729740&view=rev
Log:
PIG-4802: Autoparallelism should estimate less when there is combiner (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1729740&r1=1729739&r2=1729740&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Feb 10 21:23:24 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4802: Autoparallelism should estimate less when there is combiner (rohini)
+
PIG-4761: Add more information to front end error messages (eyal via daijy)
PIG-4792: Do not add java and sun system properties to jobconf (rohini)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1729740&r1=1729739&r2=1729740&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Wed Feb 10 21:23:24 2016
@@ -146,7 +146,7 @@ public class TezOperator extends Operato
private boolean useGraceParallelism = false;
- private double parallelismFactor = -1;
+ private Map<OperatorKey, Double> parallelismFactorPerSuccessor;
private Boolean intermediateReducer = null;
@@ -658,13 +658,25 @@ public class TezOperator extends Operato
return useGraceParallelism;
}
- public double getParallelismFactor() throws VisitorException {
- if (parallelismFactor == -1) {
- TezParallelismFactorVisitor parallelismFactorVisitor = new TezParallelismFactorVisitor(plan, getOperatorKey().toString());
+ public double getParallelismFactor(TezOperator successor) throws VisitorException {
+ if (parallelismFactorPerSuccessor == null) {
+ parallelismFactorPerSuccessor = new HashMap<OperatorKey, Double>();
+ }
+ Double factor = parallelismFactorPerSuccessor.get(successor.getOperatorKey());
+ if (factor == null) {
+ // We determine different parallelism factors for different successors (edges).
+ // For eg: If we have two successors, one with combine plan and other without
+ // we want to compute lesser parallelism factor for the one with the combine plan
+ // as that edge will get less data.
+ // TODO: To be more perfect, we need only look at the split sub-plan that
+ // writes to that successor edge. If there is a FILTER in one sub-plan it is accounted
+ // for all the successors now which is not right.
+ TezParallelismFactorVisitor parallelismFactorVisitor = new TezParallelismFactorVisitor(this, successor);
parallelismFactorVisitor.visit();
- parallelismFactor = parallelismFactorVisitor.getFactor();
+ factor = parallelismFactorVisitor.getFactor();
+ parallelismFactorPerSuccessor.put(successor.getOperatorKey(), factor);
}
- return parallelismFactor;
+ return factor;
}
public Boolean isIntermediateReducer() throws IOException {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1729740&r1=1729739&r2=1729740&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Wed Feb 10 21:23:24 2016
@@ -26,8 +26,10 @@ import org.apache.hadoop.conf.Configurat
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigReducerEstimator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
@@ -42,6 +44,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.OperatorKey;
@@ -55,7 +58,7 @@ import org.apache.tez.dag.api.EdgeProper
*
* Since currently it is only possible to reduce the parallelism
* estimation is exaggerated and will rely on Tez runtime to
- * descrease the parallelism
+ * decrease the parallelism
*/
public class TezOperDependencyParallelismEstimator implements TezParallelismEstimator {
@@ -64,6 +67,14 @@ public class TezOperDependencyParallelis
static final double DEFAULT_FILTER_FACTOR = 0.7;
static final double DEFAULT_LIMIT_FACTOR = 0.1;
+ // Most of the cases distinct does not reduce much.
+ // So keeping it high at 0.9
+ static final double DEFAULT_DISTINCT_FACTOR = 0.9;
+
+ // Most of the cases aggregation can reduce by a lot.
+ // But keeping at 0.7 to take worst case scenarios into account
+ static final double DEFAULT_AGGREGATION_FACTOR = 0.7;
+
private PigContext pc;
@Override
@@ -118,7 +129,7 @@ public class TezOperDependencyParallelis
//For cases like Union we can just limit to sum of pred vertices parallelism
boolean applyFactor = !tezOper.isUnion();
if (!pred.isVertexGroup() && applyFactor) {
- predParallelism = predParallelism * pred.getParallelismFactor();
+ predParallelism = predParallelism * pred.getParallelismFactor(tezOper);
}
estimatedParallelism += predParallelism;
}
@@ -175,9 +186,24 @@ public class TezOperDependencyParallelis
public static class TezParallelismFactorVisitor extends PhyPlanVisitor {
private double factor = 1;
private String outputKey;
- public TezParallelismFactorVisitor(PhysicalPlan plan, String outputKey) {
- super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
- this.outputKey = outputKey;
+ private TezOperator tezOp;
+
+ public TezParallelismFactorVisitor(TezOperator tezOp, TezOperator successor) {
+ super(tezOp.plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(tezOp.plan));
+ this.tezOp = tezOp;
+ this.outputKey = tezOp.getOperatorKey().toString();
+
+ if (successor != null) {
+ // Map side combiner
+ TezEdgeDescriptor edge = tezOp.outEdges.get(successor.getOperatorKey());
+ if (!edge.combinePlan.isEmpty()) {
+ if (successor.isDistinct()) {
+ factor = DEFAULT_DISTINCT_FACTOR;
+ } else {
+ factor = DEFAULT_AGGREGATION_FACTOR;
+ }
+ }
+ }
}
@Override
@@ -195,11 +221,17 @@ public class TezOperDependencyParallelis
@Override
public void visitPOForEach(POForEach nfe) throws VisitorException {
List<Boolean> flattens = nfe.getToBeFlattened();
+ List<PhysicalPlan> inputPlans = nfe.getInputPlans();
boolean containFlatten = false;
- for (boolean flatten : flattens) {
- if (flatten) {
- containFlatten = true;
- break;
+ for (int i = 0; i < flattens.size(); i++) {
+ if (flattens.get(i)) {
+ PhysicalPlan inputPlan = inputPlans.get(i);
+ PhysicalOperator root = inputPlan.getRoots().get(0);
+ if (root instanceof POProject
+ && root.getResultType() == DataType.BAG) {
+ containFlatten = true;
+ break;
+ }
}
}
if (containFlatten) {
@@ -227,6 +259,12 @@ public class TezOperDependencyParallelis
// JoinPackager is equivalent to a foreach flatten after shuffle
if (pkg.getPkgr() instanceof JoinPackager) {
factor *= DEFAULT_FLATTEN_FACTOR;
+ } else if (pkg.getPkgr() instanceof CombinerPackager) {
+ if (tezOp.isDistinct()) {
+ factor *= DEFAULT_DISTINCT_FACTOR;
+ } else {
+ factor *= DEFAULT_AGGREGATION_FACTOR;
+ }
}
}
Modified: pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1729740&r1=1729739&r2=1729740&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java Wed Feb 10 21:23:24 2016
@@ -40,6 +40,7 @@ import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder;
import org.apache.pig.backend.hadoop.executionengine.tez.TezJobCompiler;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
import org.apache.pig.data.Tuple;
@@ -299,6 +300,23 @@ public class TestTezAutoParallelism {
}
@Test
+ public void testFlattenParallelism() throws IOException{
+ String outputDir = "/tmp/testFlattenParallelism";
+ String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"
+ + "B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"
+ + "C = join A by name, B by name using 'skewed' parallel 1;"
+ + "C1 = group C by A::name;"
+ + "C2 = FOREACH C1 generate group, FLATTEN(C);"
+ + "D = group C2 by group;"
+ + "E = foreach D generate group, COUNT(C2.A::name);"
+ + "STORE E into '" + outputDir + "/finalout';";
+ String log = testAutoParallelism(script, outputDir, true, TezJobCompiler.class, TezDagBuilder.class);
+ assertTrue(log.contains("For vertex - scope-74: parallelism=10"));
+ assertTrue(log.contains("For vertex - scope-75: parallelism=70"));
+ assertTrue(log.contains("Total estimated parallelism is 89"));
+ }
+
+ @Test
public void testIncreaseIntermediateParallelism1() throws IOException{
// User specified parallelism is overriden for intermediate step
String outputDir = "/tmp/testIncreaseIntermediateParallelism";
@@ -312,7 +330,7 @@ public class TestTezAutoParallelism {
// Parallelism of C should be increased
assertTrue(log.contains("Increased requested parallelism of scope-59 to 4"));
assertEquals(1, StringUtils.countMatches(log, "Increased requested parallelism"));
- assertTrue(log.contains("Total estimated parallelism is 52"));
+ assertTrue(log.contains("Total estimated parallelism is 40"));
}
@Test
@@ -358,11 +376,15 @@ public class TestTezAutoParallelism {
}
private String testIncreaseIntermediateParallelism(String script, String outputDir, boolean sortAndCheck) throws IOException {
+ return testAutoParallelism(script, outputDir, sortAndCheck, ParallelismSetter.class, TezJobCompiler.class);
+ }
+
+ private String testAutoParallelism(String script, String outputDir, boolean sortAndCheck, Class... classesToLog) throws IOException {
NodeIdGenerator.reset();
PigServer.resetScope();
StringWriter writer = new StringWriter();
// When there is a combiner operation involved user specified parallelism is overriden
- Util.createLogAppender("testIncreaseIntermediateParallelism", writer, ParallelismSetter.class, TezJobCompiler.class);
+ Util.createLogAppender("testAutoParallelism", writer, classesToLog);
try {
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000");
@@ -390,7 +412,7 @@ public class TestTezAutoParallelism {
}
return writer.toString();
} finally {
- Util.removeLogAppender("testIncreaseIntermediateParallelism", ParallelismSetter.class, TezJobCompiler.class);
+ Util.removeLogAppender("testAutoParallelism", classesToLog);
Util.deleteFile(cluster, outputDir);
}
}
Modified: pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java?rev=1729740&r1=1729739&r2=1729740&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java Wed Feb 10 21:23:24 2016
@@ -139,11 +139,11 @@ public class TestTezGraceParallelism {
.getTuplesFromConstantTupleStrings(new String[] {
"('F',1349L)", "('M',1373L)"});
Util.checkQueryOutputsAfterSort(iter, expectedResults);
- assertTrue(writer.toString().contains("Initialize parallelism for scope-52 to 20"));
- assertTrue(writer.toString().contains("Initialize parallelism for scope-61 to 100"));
+ assertTrue(writer.toString().contains("Initialize parallelism for scope-52 to 18"));
+ assertTrue(writer.toString().contains("Initialize parallelism for scope-61 to 7"));
assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-49 to 1 from 2"));
- assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-52 to 1 from 20"));
- assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-61 to 1 from 100"));
+ assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-52 to 1 from 18"));
+ assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-61 to 1 from 7"));
} finally {
Util.removeLogAppender("testDecreaseParallelism", PigGraceShuffleVertexManager.class, ShuffleVertexManager.class);
}
@@ -181,9 +181,9 @@ public class TestTezGraceParallelism {
}
assertEquals(count, 644);
System.out.println(writer.toString());
- assertTrue(writer.toString().contains("Initialize parallelism for scope-64 to 50"));
+ assertTrue(writer.toString().contains("Initialize parallelism for scope-64 to 45"));
// There are randomness in which task finishes first, so the auto parallelism could result different result
- assertTrue(Pattern.compile("Reduce auto parallelism for vertex: scope-64 to (\\d+)* from 50").matcher(writer.toString()).find());
+ assertTrue(Pattern.compile("Reduce auto parallelism for vertex: scope-64 to (\\d+)* from 45").matcher(writer.toString()).find());
} finally {
Util.removeLogAppender("testIncreaseParallelism", PigGraceShuffleVertexManager.class, ShuffleVertexManager.class);
}
@@ -208,7 +208,7 @@ public class TestTezGraceParallelism {
pigServer.registerQuery("C = distinct B;");
pigServer.registerQuery("D = load '" + INPUT_DIR + "/" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("E = group D by name;");
- pigServer.registerQuery("F = foreach E generate group as name, AVG(D.age) as avg_age;");
+ pigServer.registerQuery("F = foreach E generate FLATTEN(group) as name, AVG(D.age) as avg_age;");
pigServer.registerQuery("G = join C by name, F by name;");
Iterator<Tuple> iter = pigServer.openIterator("G");
int count = 0;
@@ -218,7 +218,7 @@ public class TestTezGraceParallelism {
}
assertEquals(count, 20);
assertTrue(writer.toString().contains("All predecessors for scope-84 are finished, time to set parallelism for scope-85"));
- assertTrue(writer.toString().contains("Initialize parallelism for scope-85 to 101"));
+ assertTrue(writer.toString().contains("Initialize parallelism for scope-85 to 10"));
} finally {
Util.removeLogAppender("testJoinWithDifferentDepth", PigGraceShuffleVertexManager.class);
}