You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2012/04/28 01:34:04 UTC

svn commit: r1331637 - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ test/org/apache/pig/test/

Author: dvryaboy
Date: Fri Apr 27 23:34:03 2012
New Revision: 1331637

URL: http://svn.apache.org/viewvc?rev=1331637&view=rev
Log:
PIG-2652:  Skew join and order by dont trigger reducer estimation (dvryaboy)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/PigServer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LimitAdjuster.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
    pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
    pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
    pig/trunk/test/org/apache/pig/test/TestPigRunner.java
    pig/trunk/test/org/apache/pig/test/TestPigStats.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1331637&r1=1331636&r2=1331637&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Apr 27 23:34:03 2012
@@ -112,6 +112,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-2652:  Skew join and order by don't trigger reducer estimation (dvryaboy)
+
 PIG-2616: JobControlCompiler.getInputSizeFromLoader must handle exceptions from LoadFunc.getStatistics (billgraham)
 
 PIG-2644: Piggybank's HadoopJobHistoryLoader throws NPE when reading broken history file (herberts via daijy)

Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1331637&r1=1331636&r2=1331637&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Fri Apr 27 23:34:03 2012
@@ -27,7 +27,6 @@ import java.io.InputStreamReader;
 import java.io.PrintStream;
 import java.io.StringReader;
 import java.io.StringWriter;
-import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -74,7 +73,6 @@ import org.apache.pig.impl.plan.Compilat
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.streaming.StreamingCommand;
 import org.apache.pig.impl.util.LogUtils;
-import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.PropertiesUtil;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
@@ -82,8 +80,6 @@ import org.apache.pig.newplan.logical.ex
 import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
 import org.apache.pig.newplan.logical.expression.ScalarExpression;
 import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor;
-import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
-import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
 import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOLoad;
 import org.apache.pig.newplan.logical.relational.LOStore;
@@ -93,7 +89,6 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.visitor.CastLineageSetter;
 import org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor;
 import org.apache.pig.newplan.logical.visitor.ScalarVariableValidator;
-import org.apache.pig.newplan.logical.visitor.ProjStarInUdfExpander;
 import org.apache.pig.newplan.logical.visitor.ScalarVisitor;
 import org.apache.pig.newplan.logical.visitor.SchemaAliasVisitor;
 import org.apache.pig.newplan.logical.visitor.TypeCheckingRelVisitor;
@@ -109,7 +104,6 @@ import org.apache.pig.tools.parameters.P
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 
@@ -1201,10 +1195,10 @@ public class PigServer {
             return exgen.getExamples();
         } catch (ExecException e) {
             e.printStackTrace(System.out);
-            throw new IOException("ExecException : " + e.getMessage());
+            throw new IOException("ExecException" , e);
         } catch (Exception e) {
             e.printStackTrace(System.out);
-            throw new IOException("Exception : " + e.getMessage());
+            throw new IOException("Exception ", e);
         }
      
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1331637&r1=1331636&r2=1331637&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Apr 27 23:34:03 2012
@@ -58,6 +58,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -261,7 +262,7 @@ public class JobControlCompiler{
                 if(mro instanceof NativeMapReduceOper) {
                     return null;
                 }
-                Job job = getJob(mro, conf, pigContext);
+                Job job = getJob(plan, mro, conf, pigContext);
                 jobMroMap.put(job, mro);
                 jobCtrl.addJob(job);
             }
@@ -327,7 +328,7 @@ public class JobControlCompiler{
      * @throws JobCreationException
      */
     @SuppressWarnings({ "unchecked", "deprecation" })
-    private Job getJob(MapReduceOper mro, Configuration config, PigContext pigContext) throws JobCreationException{
+    private Job getJob(MROperPlan plan, MapReduceOper mro, Configuration config, PigContext pigContext) throws JobCreationException{
         org.apache.hadoop.mapreduce.Job nwJob = null;
         
         try{
@@ -376,6 +377,8 @@ public class JobControlCompiler{
         }
                 
         try{        
+            adjustNumReducers(plan, mro, conf, nwJob);
+
             //Process the POLoads
             List<POLoad> lds = PlanHelper.getLoads(mro.mapPlan);
             
@@ -594,14 +597,6 @@ public class JobControlCompiler{
                 nwJob.setMapperClass(PigMapReduce.Map.class);
                 nwJob.setReducerClass(PigMapReduce.Reduce.class);
                 
-                // first check the PARALLE in query, then check the defaultParallel in PigContext, and last do estimation
-                if (mro.requestedParallelism > 0)
-                    nwJob.setNumReduceTasks(mro.requestedParallelism);
-		else if (pigContext.defaultParallel > 0)
-                    conf.set("mapred.reduce.tasks", ""+pigContext.defaultParallel);
-                else
-                    estimateNumberOfReducers(conf, lds, nwJob);
-                
                 if (mro.customPartitioner != null)
                 	nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
 
@@ -743,9 +738,42 @@ public class JobControlCompiler{
         }
     }
 
+    public void adjustNumReducers(MROperPlan plan, MapReduceOper mro, Configuration conf,
+            org.apache.hadoop.mapreduce.Job nwJob) throws IOException {
+        List<PhysicalOperator> loads = mro.mapPlan.getRoots();
+        List<POLoad> lds = new ArrayList<POLoad>();
+        for (PhysicalOperator ld : loads) {
+            lds.add((POLoad)ld);
+        }
+        int jobParallelism = -1;
+        int estimatedParallelism = estimateNumberOfReducers(conf, lds, nwJob);
+        if (mro.requestedParallelism > 0) {
+            jobParallelism = mro.requestedParallelism;
+        } else if (pigContext.defaultParallel > 0) {
+            jobParallelism = pigContext.defaultParallel;
+        } else {
+            jobParallelism = estimatedParallelism;
+        }
+        // Special case: Skewed Join and Order set parallelism to 1 even when no parallelism is specified.
+        if ((mro.isSkewedJoin() || mro.isGlobalSort()) && jobParallelism == 1) {
+            jobParallelism = estimatedParallelism;
+        }
+        if (mro.isSampler() && jobParallelism == 1) {
+            // Note: this is suboptimal, as the number of reducers communicated to the
+            // sampler is only based on the sampler inputs, meaning, the left side in the case
+            // of a skewed join. Ideally, we'd take into account the right side, as well.
+            ParallelConstantVisitor visitor =
+              new ParallelConstantVisitor(mro.reducePlan, estimatedParallelism);
+            visitor.visit();
+        }
+        log.info("Setting Parallelism to " + jobParallelism);
+        mro.requestedParallelism = jobParallelism;
+        conf.setInt("mapred.reduce.tasks", jobParallelism);
+    }
+
     /**
      * Looks up the estimator from REDUCER_ESTIMATOR_KEY and invokes it to find the number of
-     * reducers to use. If REDUCER_ESTIMATOR_KEY isn't set, defaults to InputSizeReducerEstimator
+     * reducers to use. If REDUCER_ESTIMATOR_KEY isn't set, defaults to InputSizeReducerEstimator.
      * @param conf
      * @param lds
      * @throws IOException
@@ -759,10 +787,6 @@ public class JobControlCompiler{
 
         log.info("Using reducer estimator: " + estimator.getClass().getName());
         int numberOfReducers = estimator.estimateNumberOfReducers(conf, lds, job);
-        conf.setInt("mapred.reduce.tasks", numberOfReducers);
-
-        log.info("Neither PARALLEL nor default parallelism is set for this job. Setting number of "
-                + "reducers to " + numberOfReducers);
         return numberOfReducers;
     }
 
@@ -1398,4 +1422,37 @@ public class JobControlCompiler{
             }
         }
     }   
+
+    private static class ParallelConstantVisitor extends PhyPlanVisitor {
+
+        private int rp;
+
+        private boolean replaced = false;
+
+        public ParallelConstantVisitor(PhysicalPlan plan, int rp) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+                    plan));
+            this.rp = rp;
+}
+
+        @Override
+        public void visitConstant(ConstantExpression cnst) throws VisitorException {
+            if (cnst.getRequestedParallelism() == -1) {
+                Object obj = cnst.getValue();
+                if (obj instanceof Integer) {
+                    if (replaced) {
+                        // sample job should have only one ConstantExpression
+                        throw new VisitorException("Invalid reduce plan: more " +
+                                       "than one ConstantExpression found in sampling job");
+                    }
+                    cnst.setValue(rp);
+                    cnst.setRequestedParallelism(rp);
+                    replaced = true;
+                }
+            }
+        }
+
+        boolean isReplaced() { return replaced; }
+    }
+
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LimitAdjuster.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LimitAdjuster.java?rev=1331637&r1=1331636&r2=1331637&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LimitAdjuster.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LimitAdjuster.java Fri Apr 27 23:34:03 2012
@@ -23,19 +23,14 @@ import java.util.List;
 
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
@@ -64,9 +59,13 @@ public class LimitAdjuster extends MROpP
     @Override
     public void visitMROp(MapReduceOper mr) throws VisitorException {
         // Look for map reduce operators which contains limit operator.
-        // If so and the requestedParallelism > 1, add one additional map-reduce
-        // operator with 1 reducer into the original plan
-        if ((mr.limit!=-1 || mr.limitPlan!=null) && mr.requestedParallelism!=1)
+        // If so, add one additional map-reduce
+        // operator with 1 reducer into the original plan.
+
+        // TODO: This new MR job can be skipped if at runtime we discover that
+        // its parent only has a single reducer (mr.requestedParallelism!=1).
+        // This check MUST happen at runtime since that's when reducer estimation happens.
+        if ((mr.limit!=-1 || mr.limitPlan!=null) )
         {
             opsToAdjust.add(mr);
         }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1331637&r1=1331636&r2=1331637&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Apr 27 23:34:03 2012
@@ -38,7 +38,6 @@ import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
@@ -65,7 +64,6 @@ import org.apache.pig.impl.plan.Compilat
 import org.apache.pig.impl.util.ConfigurationValidator;
 import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.pig.tools.pigstats.ScriptState;
@@ -516,7 +514,6 @@ public class MapReduceLauncher extends L
             pc.getProperties().getProperty(
                     "last.input.chunksize", POJoinPackage.DEFAULT_CHUNK_SIZE);
         
-        //String prop = System.getProperty("pig.exec.nocombiner");
         String prop = pc.getProperties().getProperty("pig.exec.nocombiner");
         if (!pc.inIllustrator && !("true".equals(prop)))  {
             boolean doMapAgg = 
@@ -532,10 +529,12 @@ public class MapReduceLauncher extends L
         SampleOptimizer so = new SampleOptimizer(plan, pc);
         so.visit();
         
+        // We must ensure that there is only 1 reducer for a limit. Add a single-reducer job.
+        if (!pc.inIllustrator) {
         LimitAdjuster la = new LimitAdjuster(plan, pc);
         la.visit();
         la.adjust();
-        
+        }
         // Optimize to use secondary sort key if possible
         prop = pc.getProperties().getProperty("pig.exec.nosecondarykey");
         if (!pc.inIllustrator && !("true".equals(prop)))  {
@@ -621,6 +620,7 @@ public class MapReduceLauncher extends L
      */
     class JobControlThreadExceptionHandler implements Thread.UncaughtExceptionHandler {
         
+        @Override
         public void uncaughtException(Thread thread, Throwable throwable) {
             jobControlExceptionStackTrace = getStackStraceStr(throwable);
             try {	

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=1331637&r1=1331636&r2=1331637&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java Fri Apr 27 23:34:03 2012
@@ -126,33 +126,6 @@ public class SampleOptimizer extends MRO
             return;
         }
         MapReduceOper succ = succs.get(0);
-        
-        // set/estimate the parallelism
-        if (succ.requestedParallelism == 1) {
-            List<PhysicalOperator> loads = pred.mapPlan.getRoots();
-            List<POLoad> lds = new ArrayList<POLoad>();
-            for (PhysicalOperator ld : loads) {
-                lds.add((POLoad)ld);
-            }
-            Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
-            int rp = 1;
-            try {
-                rp = JobControlCompiler.estimateNumberOfReducers(
-                        conf, lds, new org.apache.hadoop.mapreduce.Job(conf));
-            } catch (IOException e) {
-                log.warn("Failed to estimate number of reducers", e);
-            }
-            
-            if (rp > 1) {
-                ParallelConstantVisitor visitor = new ParallelConstantVisitor(mr.reducePlan, rp);
-                visitor.visit();
-                if (visitor.isReplaced()) {
-                    succ.requestedParallelism = rp;
-                    log.info(" Setting number of reducers for order by to " + rp);
-                }
-            }
-        }
-        
         if (pred.mapPlan == null || pred.mapPlan.size() != 2) {
             log.debug("Predecessor has more than just load+store in the map");
             return;
@@ -273,35 +246,4 @@ public class SampleOptimizer extends MRO
 	    	}
 		}
     }
-    
-    private static class ParallelConstantVisitor extends PhyPlanVisitor {
-
-        private int rp;
-        
-        private boolean replaced = false;
-        
-        public ParallelConstantVisitor(PhysicalPlan plan, int rp) {
-            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
-                    plan));
-            this.rp = rp;
-        }
-        
-        public void visitConstant(ConstantExpression cnst) throws VisitorException {            
-            if (cnst.getRequestedParallelism() == -1) {
-                Object obj = cnst.getValue();
-                if (obj instanceof Integer) {
-                    if (replaced) {
-                        // sample job should have only one ConstantExpression
-                        throw new VisitorException("Invalid reduce plan: more " +
-                        		"than one ConstantExpression found in sampling job");
-                    }
-                    cnst.setValue(rp);                    
-                    cnst.setRequestedParallelism(rp);
-                    replaced = true;
-                }
-            }
-        }
-     
-        boolean isReplaced() { return replaced; }
-    }
 }

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1331637&r1=1331636&r2=1331637&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Fri Apr 27 23:34:03 2012
@@ -252,7 +252,7 @@ public class TestEvalPipeline2 {
         Assert.assertTrue(tup.get(0) instanceof DataBag);
         DataBag db = (DataBag)tup.get(0);
         Assert.assertTrue(db.iterator().hasNext());
-        Tuple innerTuple = (Tuple)db.iterator().next();
+        Tuple innerTuple = db.iterator().next();
         Assert.assertTrue(innerTuple.get(0)==null);
 
         //tuple 5 
@@ -1487,6 +1487,7 @@ public class TestEvalPipeline2 {
     }    
     
     static public class UDFWithNonStandardType extends EvalFunc<Tuple>{
+        @Override
         public Tuple exec(Tuple input) throws IOException {
             Tuple t = TupleFactory.getInstance().newTuple();
             t.append(new ArrayList<Integer>());
@@ -1655,9 +1656,9 @@ public class TestEvalPipeline2 {
         
         Util.createInputFile(cluster, "table_testLimitAutoReducer", input);
         
-        pigServer.getPigContext().getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "9");
-        pigServer.registerQuery("A = load 'table_testLimitAutoReducer' as (a0, a1);");
-        pigServer.registerQuery("B = order A by a0;");
+        pigServer.getPigContext().getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "16");
+        pigServer.registerQuery("A = load 'table_testLimitAutoReducer';");
+        pigServer.registerQuery("B = order A by $0;");
         pigServer.registerQuery("C = limit B 2;");
         
         Iterator<Tuple> iter = pigServer.openIterator("C");

Modified: pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java?rev=1331637&r1=1331636&r2=1331637&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Fri Apr 27 23:34:03 2012
@@ -17,7 +17,8 @@
  */
 package org.apache.pig.test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.util.Iterator;
@@ -40,6 +41,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.data.DataType;
@@ -618,9 +620,19 @@ public class TestJobSubmission {
         PhysicalPlan pp = Util.buildPp(ps, query);
 
         MROperPlan mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
+        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
+        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
+        JobControl jobControl = jcc.compile(mrPlan, query);
+
         assertEquals(2, mrPlan.size());     
         
+        // Simulate the first job having run so estimation kicks in.
         MapReduceOper sort = mrPlan.getLeaves().get(0);        
+        jcc.updateMROpPlan(jobControl.getReadyJobs());
+        FileLocalizer.create(sort.getQuantFile(), pc);
+        jcc.compile(mrPlan, query);
+
+        sort = mrPlan.getLeaves().get(0);
         long reducer=Math.min((long)Math.ceil(new File("test/org/apache/pig/test/data/passwd").length()/100.0), 10);
         assertEquals(reducer, sort.getRequestedParallelism());
         
@@ -630,6 +642,7 @@ public class TestJobSubmission {
         pp = Util.buildPp(ps, query);
         
         mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);               
+
         assertEquals(2, mrPlan.size());     
         
         sort = mrPlan.getLeaves().get(0);        
@@ -658,6 +671,17 @@ public class TestJobSubmission {
         mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
         assertEquals(3, mrPlan.size());     
         
+        // Simulate the first 2 jobs having run so estimation kicks in.
+        sort = mrPlan.getLeaves().get(0);
+        FileLocalizer.create(sort.getQuantFile(), pc);
+
+        jobControl = jcc.compile(mrPlan, query);
+        Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd", ((POLoad) sort.mapPlan.getRoots().get(0)).getLFile().getFileName());
+        jcc.updateMROpPlan(jobControl.getReadyJobs());
+        jobControl = jcc.compile(mrPlan, query);
+        jcc.updateMROpPlan(jobControl.getReadyJobs());
+
+        jobControl = jcc.compile(mrPlan, query);
         sort = mrPlan.getLeaves().get(0);       
         assertEquals(reducer, sort.getRequestedParallelism());
     }

Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1331637&r1=1331636&r2=1331637&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Fri Apr 27 23:34:03 2012
@@ -217,7 +217,7 @@ public class TestPigRunner {
         try {
             PigStats stats = PigRunner.run(args, new TestNotificationListener());
             assertTrue(stats.isSuccessful());
-            assertTrue(stats.getJobGraph().size() == 3);
+            assertTrue(stats.getJobGraph().size() == 4);
             assertTrue(stats.getJobGraph().getSinks().size() == 1);
             assertTrue(stats.getJobGraph().getSources().size() == 1);
             JobStats js = (JobStats) stats.getJobGraph().getSinks().get(0);

Modified: pig/trunk/test/org/apache/pig/test/TestPigStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigStats.java?rev=1331637&r1=1331636&r2=1331637&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigStats.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigStats.java Fri Apr 27 23:34:03 2012
@@ -161,8 +161,7 @@ public class TestPigStats  {
             PhysicalPlan pp = pig.getPigContext().getExecutionEngine().compile(lp,
                     null);
             MROperPlan mp = getMRPlan(pp, pig.getPigContext());
-            
-            assertEquals(3, mp.getKeys().size());
+            assertEquals(4, mp.getKeys().size());
             
             MapReduceOper mro = mp.getRoots().get(0);
             assertEquals("A,B,C", getAlias(mro));