You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/02/10 22:23:24 UTC

svn commit: r1729740 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ test/org/apache/pig/tez/

Author: rohini
Date: Wed Feb 10 21:23:24 2016
New Revision: 1729740

URL: http://svn.apache.org/viewvc?rev=1729740&view=rev
Log:
PIG-4802: Autoparallelism should estimate less when there is combiner (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
    pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
    pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1729740&r1=1729739&r2=1729740&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Feb 10 21:23:24 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4802: Autoparallelism should estimate less when there is combiner (rohini)
+
 PIG-4761: Add more information to front end error messages (eyal via daijy)
 
 PIG-4792: Do not add java and sun system properties to jobconf (rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1729740&r1=1729739&r2=1729740&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Wed Feb 10 21:23:24 2016
@@ -146,7 +146,7 @@ public class TezOperator extends Operato
 
     private boolean useGraceParallelism = false;
 
-    private double parallelismFactor = -1;
+    private Map<OperatorKey, Double> parallelismFactorPerSuccessor;
 
     private Boolean intermediateReducer = null;
 
@@ -658,13 +658,25 @@ public class TezOperator extends Operato
         return useGraceParallelism;
     }
 
-    public double getParallelismFactor() throws VisitorException {
-        if (parallelismFactor == -1) {
-            TezParallelismFactorVisitor parallelismFactorVisitor = new TezParallelismFactorVisitor(plan, getOperatorKey().toString());
+    public double getParallelismFactor(TezOperator successor) throws VisitorException {
+        if (parallelismFactorPerSuccessor == null) {
+            parallelismFactorPerSuccessor = new HashMap<OperatorKey, Double>();
+        }
+        Double factor = parallelismFactorPerSuccessor.get(successor.getOperatorKey());
+        if (factor == null) {
+            // We determine different parallelism factors for different successors (edges).
+            // For eg: If we have two successors, one with combine plan and other without
+            // we want to compute lesser parallelism factor for the one with the combine plan
+            // as that edge will get less data.
+            // TODO: To be more perfect, we need only look at the split sub-plan that
+            // writes to that successor edge. If there is a FILTER in one sub-plan it is accounted
+            // for all the successors now which is not right.
+            TezParallelismFactorVisitor parallelismFactorVisitor = new TezParallelismFactorVisitor(this, successor);
             parallelismFactorVisitor.visit();
-            parallelismFactor = parallelismFactorVisitor.getFactor();
+            factor = parallelismFactorVisitor.getFactor();
+            parallelismFactorPerSuccessor.put(successor.getOperatorKey(), factor);
         }
-        return parallelismFactor;
+        return factor;
     }
 
     public Boolean isIntermediateReducer() throws IOException {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1729740&r1=1729739&r2=1729740&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Wed Feb 10 21:23:24 2016
@@ -26,8 +26,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigReducerEstimator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
@@ -42,6 +44,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -55,7 +58,7 @@ import org.apache.tez.dag.api.EdgeProper
  *
  * Since currently it is only possible to reduce the parallelism
  * estimation is exaggerated and will rely on Tez runtime to
- * descrease the parallelism
+ * decrease the parallelism
  */
 public class TezOperDependencyParallelismEstimator implements TezParallelismEstimator {
 
@@ -64,6 +67,14 @@ public class TezOperDependencyParallelis
     static final double DEFAULT_FILTER_FACTOR = 0.7;
     static final double DEFAULT_LIMIT_FACTOR = 0.1;
 
+    // Most of the cases distinct does not reduce much.
+    // So keeping it high at 0.9
+    static final double DEFAULT_DISTINCT_FACTOR = 0.9;
+
+    // Most of the cases aggregation can reduce by a lot.
+    // But keeping at 0.7 to take worst case scenarios into account
+    static final double DEFAULT_AGGREGATION_FACTOR = 0.7;
+
     private PigContext pc;
 
     @Override
@@ -118,7 +129,7 @@ public class TezOperDependencyParallelis
                 //For cases like Union we can just limit to sum of pred vertices parallelism
                 boolean applyFactor = !tezOper.isUnion();
                 if (!pred.isVertexGroup() && applyFactor) {
-                    predParallelism = predParallelism * pred.getParallelismFactor();
+                    predParallelism = predParallelism * pred.getParallelismFactor(tezOper);
                 }
                 estimatedParallelism += predParallelism;
             }
@@ -175,9 +186,24 @@ public class TezOperDependencyParallelis
     public static class TezParallelismFactorVisitor extends PhyPlanVisitor {
         private double factor = 1;
         private String outputKey;
-        public TezParallelismFactorVisitor(PhysicalPlan plan, String outputKey) {
-            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
-            this.outputKey = outputKey;
+        private TezOperator tezOp;
+
+        public TezParallelismFactorVisitor(TezOperator tezOp, TezOperator successor) {
+            super(tezOp.plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(tezOp.plan));
+            this.tezOp = tezOp;
+            this.outputKey = tezOp.getOperatorKey().toString();
+
+            if (successor != null) {
+                // Map side combiner
+                TezEdgeDescriptor edge = tezOp.outEdges.get(successor.getOperatorKey());
+                if (!edge.combinePlan.isEmpty()) {
+                    if (successor.isDistinct()) {
+                        factor = DEFAULT_DISTINCT_FACTOR;
+                    } else {
+                        factor = DEFAULT_AGGREGATION_FACTOR;
+                    }
+                }
+            }
         }
 
         @Override
@@ -195,11 +221,17 @@ public class TezOperDependencyParallelis
         @Override
         public void visitPOForEach(POForEach nfe) throws VisitorException {
             List<Boolean> flattens = nfe.getToBeFlattened();
+            List<PhysicalPlan> inputPlans = nfe.getInputPlans();
             boolean containFlatten = false;
-            for (boolean flatten : flattens) {
-                if (flatten) {
-                    containFlatten = true;
-                    break;
+            for (int i = 0; i < flattens.size(); i++) {
+                if (flattens.get(i)) {
+                    PhysicalPlan inputPlan = inputPlans.get(i);
+                    PhysicalOperator root = inputPlan.getRoots().get(0);
+                    if (root instanceof POProject
+                            && root.getResultType() == DataType.BAG) {
+                        containFlatten = true;
+                        break;
+                    }
                 }
             }
             if (containFlatten) {
@@ -227,6 +259,12 @@ public class TezOperDependencyParallelis
             // JoinPackager is equivalent to a foreach flatten after shuffle
             if (pkg.getPkgr() instanceof JoinPackager) {
                 factor *= DEFAULT_FLATTEN_FACTOR;
+            } else if (pkg.getPkgr() instanceof CombinerPackager) {
+                if (tezOp.isDistinct()) {
+                    factor *= DEFAULT_DISTINCT_FACTOR;
+                } else {
+                    factor *= DEFAULT_AGGREGATION_FACTOR;
+                }
             }
         }
 

Modified: pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1729740&r1=1729739&r2=1729740&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java Wed Feb 10 21:23:24 2016
@@ -40,6 +40,7 @@ import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezJobCompiler;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
 import org.apache.pig.data.Tuple;
@@ -299,6 +300,23 @@ public class TestTezAutoParallelism {
     }
 
     @Test
+    public void testFlattenParallelism() throws IOException{
+        String outputDir = "/tmp/testFlattenParallelism";
+        String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"
+                + "B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"
+                + "C = join A by name, B by name using 'skewed' parallel 1;"
+                + "C1 = group C by A::name;"
+                + "C2 = FOREACH C1 generate group, FLATTEN(C);"
+                + "D = group C2 by group;"
+                + "E = foreach D generate group, COUNT(C2.A::name);"
+                + "STORE E into '" + outputDir + "/finalout';";
+        String log = testAutoParallelism(script, outputDir, true, TezJobCompiler.class, TezDagBuilder.class);
+        assertTrue(log.contains("For vertex - scope-74: parallelism=10"));
+        assertTrue(log.contains("For vertex - scope-75: parallelism=70"));
+        assertTrue(log.contains("Total estimated parallelism is 89"));
+    }
+
+    @Test
     public void testIncreaseIntermediateParallelism1() throws IOException{
         // User specified parallelism is overriden for intermediate step
         String outputDir = "/tmp/testIncreaseIntermediateParallelism";
@@ -312,7 +330,7 @@ public class TestTezAutoParallelism {
         // Parallelism of C should be increased
         assertTrue(log.contains("Increased requested parallelism of scope-59 to 4"));
         assertEquals(1, StringUtils.countMatches(log, "Increased requested parallelism"));
-        assertTrue(log.contains("Total estimated parallelism is 52"));
+        assertTrue(log.contains("Total estimated parallelism is 40"));
     }
 
     @Test
@@ -358,11 +376,15 @@ public class TestTezAutoParallelism {
     }
 
     private String testIncreaseIntermediateParallelism(String script, String outputDir, boolean sortAndCheck) throws IOException {
+        return testAutoParallelism(script, outputDir, sortAndCheck, ParallelismSetter.class, TezJobCompiler.class);
+    }
+
+    private String testAutoParallelism(String script, String outputDir, boolean sortAndCheck, Class... classesToLog) throws IOException {
         NodeIdGenerator.reset();
         PigServer.resetScope();
         StringWriter writer = new StringWriter();
         // When there is a combiner operation involved user specified parallelism is overriden
-        Util.createLogAppender("testIncreaseIntermediateParallelism", writer, ParallelismSetter.class, TezJobCompiler.class);
+        Util.createLogAppender("testAutoParallelism", writer, classesToLog);
         try {
             pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
             pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000");
@@ -390,7 +412,7 @@ public class TestTezAutoParallelism {
             }
             return writer.toString();
         } finally {
-            Util.removeLogAppender("testIncreaseIntermediateParallelism", ParallelismSetter.class, TezJobCompiler.class);
+            Util.removeLogAppender("testAutoParallelism", classesToLog);
             Util.deleteFile(cluster, outputDir);
         }
     }

Modified: pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java?rev=1729740&r1=1729739&r2=1729740&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java Wed Feb 10 21:23:24 2016
@@ -139,11 +139,11 @@ public class TestTezGraceParallelism {
                     .getTuplesFromConstantTupleStrings(new String[] {
                             "('F',1349L)", "('M',1373L)"});
             Util.checkQueryOutputsAfterSort(iter, expectedResults);
-            assertTrue(writer.toString().contains("Initialize parallelism for scope-52 to 20"));
-            assertTrue(writer.toString().contains("Initialize parallelism for scope-61 to 100"));
+            assertTrue(writer.toString().contains("Initialize parallelism for scope-52 to 18"));
+            assertTrue(writer.toString().contains("Initialize parallelism for scope-61 to 7"));
             assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-49 to 1 from 2"));
-            assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-52 to 1 from 20"));
-            assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-61 to 1 from 100"));
+            assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-52 to 1 from 18"));
+            assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-61 to 1 from 7"));
         } finally {
             Util.removeLogAppender("testDecreaseParallelism", PigGraceShuffleVertexManager.class, ShuffleVertexManager.class);
         }
@@ -181,9 +181,9 @@ public class TestTezGraceParallelism {
             }
             assertEquals(count, 644);
             System.out.println(writer.toString());
-            assertTrue(writer.toString().contains("Initialize parallelism for scope-64 to 50"));
+            assertTrue(writer.toString().contains("Initialize parallelism for scope-64 to 45"));
             // There are randomness in which task finishes first, so the auto parallelism could result different result
-            assertTrue(Pattern.compile("Reduce auto parallelism for vertex: scope-64 to (\\d+)* from 50").matcher(writer.toString()).find());
+            assertTrue(Pattern.compile("Reduce auto parallelism for vertex: scope-64 to (\\d+)* from 45").matcher(writer.toString()).find());
         } finally {
             Util.removeLogAppender("testIncreaseParallelism", PigGraceShuffleVertexManager.class, ShuffleVertexManager.class);
         }
@@ -208,7 +208,7 @@ public class TestTezGraceParallelism {
             pigServer.registerQuery("C = distinct B;");
             pigServer.registerQuery("D = load '" + INPUT_DIR + "/" + INPUT_FILE1 + "' as (name:chararray, age:int);");
             pigServer.registerQuery("E = group D by name;");
-            pigServer.registerQuery("F = foreach E generate group as name, AVG(D.age) as avg_age;");
+            pigServer.registerQuery("F = foreach E generate FLATTEN(group) as name, AVG(D.age) as avg_age;");
             pigServer.registerQuery("G = join C by name, F by name;");
             Iterator<Tuple> iter = pigServer.openIterator("G");
             int count = 0;
@@ -218,7 +218,7 @@ public class TestTezGraceParallelism {
             }
             assertEquals(count, 20);
             assertTrue(writer.toString().contains("All predecessors for scope-84 are finished, time to set parallelism for scope-85"));
-            assertTrue(writer.toString().contains("Initialize parallelism for scope-85 to 101"));
+            assertTrue(writer.toString().contains("Initialize parallelism for scope-85 to 10"));
         } finally {
             Util.removeLogAppender("testJoinWithDifferentDepth", PigGraceShuffleVertexManager.class);
         }