You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by bi...@apache.org on 2012/08/14 02:08:37 UTC

svn commit: r1372679 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ test/org/apache/pig/test/

Author: billgraham
Date: Tue Aug 14 00:08:37 2012
New Revision: 1372679

URL: http://svn.apache.org/viewvc?rev=1372679&view=rev
Log:
PIG-2871: Refactor signature for PigReducerEstimator (billgraham)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java
    pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1372679&r1=1372678&r2=1372679&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Aug 14 00:08:37 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2871: Refactor signature for PigReducerEstimator (billgraham)
+
 PIG-2851: Add flag to ant to run tests with a debugger port (billgraham)
 
 PIG-2862: Hardcode certain tuple lengths into the TUPLE BinInterSedes byte identifier (jcoveney)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1372679&r1=1372678&r2=1372679&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java Tue Aug 14 00:08:37 2012
@@ -28,6 +28,7 @@ import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadMetadata;
 import org.apache.pig.ResourceStatistics;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.impl.util.UriUtil;
 import org.apache.pig.impl.util.Utils;
 
@@ -64,16 +65,19 @@ public class InputSizeReducerEstimator i
     /**
      * Determines the number of reducers to be used.
      *
-     * @param conf the job configuration
-     * @param lds list of POLoads used in the jobs physical plan
      * @param job job instance
+     * @param mapReduceOper
      * @throws java.io.IOException
      */
     @Override
-    public int estimateNumberOfReducers(Configuration conf, List<POLoad> lds, Job job) throws IOException {
+    public int estimateNumberOfReducers(Job job, MapReduceOper mapReduceOper) throws IOException {
+        Configuration conf = job.getConfiguration();
+
         long bytesPerReducer = conf.getLong(BYTES_PER_REDUCER_PARAM, DEFAULT_BYTES_PER_REDUCER);
         int maxReducers = conf.getInt(MAX_REDUCER_COUNT_PARAM, DEFAULT_MAX_REDUCER_COUNT_PARAM);
-        long totalInputFileSize = getTotalInputFileSize(conf, lds, job);
+
+        List<POLoad> poLoads = PlanHelper.getPhysicalOperators(mapReduceOper.mapPlan, POLoad.class);
+        long totalInputFileSize = getTotalInputFileSize(conf, poLoads, job);
 
         log.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
             + maxReducers + " totalInputFileSize=" + totalInputFileSize);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1372679&r1=1372678&r2=1372679&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Aug 14 00:08:37 2012
@@ -407,7 +407,7 @@ public class JobControlCompiler{
                 }
             }
 
-            adjustNumReducers(plan, mro, conf, nwJob);
+            adjustNumReducers(plan, mro, nwJob);
 
             if(lds!=null && lds.size()>0){
               for (POLoad ld : lds) {
@@ -763,13 +763,12 @@ public class JobControlCompiler{
      * the number of partitions used in the sampler.
      * @param plan the MR plan
      * @param mro the MR operator
-     * @param conf the configuration
      * @param nwJob the current job
      * @throws IOException
      */
-    public void adjustNumReducers(MROperPlan plan, MapReduceOper mro, Configuration conf,
+    public void adjustNumReducers(MROperPlan plan, MapReduceOper mro,
             org.apache.hadoop.mapreduce.Job nwJob) throws IOException {
-        int jobParallelism = calculateRuntimeReducers(mro, conf, nwJob);
+        int jobParallelism = calculateRuntimeReducers(mro, nwJob);
 
         if (mro.isSampler()) {
             // We need to calculate the final number of reducers of the next job (order-by or skew-join)
@@ -778,7 +777,7 @@ public class JobControlCompiler{
 
             // Here we use the same conf and Job to calculate the runtime #reducers of the next job
             // which is fine as the statistics comes from the nextMro's POLoads
-            int nPartitions = calculateRuntimeReducers(nextMro, conf, nwJob);
+            int nPartitions = calculateRuntimeReducers(nextMro, nwJob);
 
             // set the runtime #reducer of the next job as the #partition
             ParallelConstantVisitor visitor =
@@ -787,6 +786,8 @@ public class JobControlCompiler{
         }
         log.info("Setting Parallelism to " + jobParallelism);
 
+        Configuration conf = nwJob.getConfiguration();
+
         // set various parallelism into the job conf for later analysis, PIG-2779
         conf.setInt("pig.info.reducers.default.parallel", pigContext.defaultParallel);
         conf.setInt("pig.info.reducers.requested.parallel", mro.requestedParallelism);
@@ -805,13 +806,12 @@ public class JobControlCompiler{
      * @return the runtimeParallelism
      * @throws IOException
      */
-    private int calculateRuntimeReducers(MapReduceOper mro, Configuration conf,
+    private int calculateRuntimeReducers(MapReduceOper mro,
             org.apache.hadoop.mapreduce.Job nwJob) throws IOException{
         // we don't recalculate for the same job
         if (mro.runtimeParallelism != -1) {
             return mro.runtimeParallelism;
         }
-        List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class);
 
         int jobParallelism = -1;
 
@@ -820,7 +820,7 @@ public class JobControlCompiler{
         } else if (pigContext.defaultParallel > 0) {
             jobParallelism = pigContext.defaultParallel;
         } else {
-            mro.estimatedParallelism = estimateNumberOfReducers(conf, lds, nwJob);
+            mro.estimatedParallelism = estimateNumberOfReducers(nwJob, mro);
             // reducer estimation could return -1 if it couldn't estimate
             log.info("Could not estimate number of reducers and no requested or default " +
                      "parallelism set. Defaulting to 1 reducer.");
@@ -831,22 +831,25 @@ public class JobControlCompiler{
         mro.runtimeParallelism = jobParallelism;
         return jobParallelism;
     }
+
     /**
      * Looks up the estimator from REDUCER_ESTIMATOR_KEY and invokes it to find the number of
      * reducers to use. If REDUCER_ESTIMATOR_KEY isn't set, defaults to InputSizeReducerEstimator.
-     * @param conf
-     * @param lds
+     * @param job
+     * @param mapReducerOper
      * @throws IOException
      */
-    public static int estimateNumberOfReducers(Configuration conf, List<POLoad> lds,
-                                        org.apache.hadoop.mapreduce.Job job) throws IOException {
+    public static int estimateNumberOfReducers(org.apache.hadoop.mapreduce.Job job,
+                                               MapReduceOper mapReducerOper) throws IOException {
+        Configuration conf = job.getConfiguration();
+
         PigReducerEstimator estimator = conf.get(REDUCER_ESTIMATOR_KEY) == null ?
           new InputSizeReducerEstimator() :
           PigContext.instantiateObjectFromParams(conf,
                   REDUCER_ESTIMATOR_KEY, REDUCER_ESTIMATOR_ARG_KEY, PigReducerEstimator.class);
 
         log.info("Using reducer estimator: " + estimator.getClass().getName());
-        int numberOfReducers = estimator.estimateNumberOfReducers(conf, lds, job);
+        int numberOfReducers = estimator.estimateNumberOfReducers(job, mapReducerOper);
         return numberOfReducers;
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java?rev=1372679&r1=1372678&r2=1372679&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java Tue Aug 14 00:08:37 2012
@@ -17,14 +17,11 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 
 import java.io.IOException;
-import java.util.List;
 
 /**
  * Interface to implement when you want to use a custom approach to estimating
@@ -46,12 +43,10 @@ public interface PigReducerEstimator {
      * Estimate the number of reducers for a given job based on the collection
      * of load funcs passed.
      *
-     * @param conf the job configuration
-     * @param poLoadList list of POLoads used in the jobs physical plan
      * @param job job instance
+     * @param mapReduceOper map reducer operator of the job
      * @return the number of reducers to use, or -1 if the count couldn't be estimated
      * @throws IOException
      */
-    public int estimateNumberOfReducers(Configuration conf, List<POLoad> poLoadList, Job job)
-        throws IOException;
+    public int estimateNumberOfReducers(Job job, MapReduceOper mapReduceOper) throws IOException;
 }

Modified: pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1372679&r1=1372678&r2=1372679&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java Tue Aug 14 00:08:37 2012
@@ -42,12 +42,12 @@ import javax.tools.JavaFileObject;
 import javax.tools.StandardJavaFileManager;
 import javax.tools.ToolProvider;
 
-import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
@@ -134,20 +134,20 @@ public class TestJobControlCompiler {
 
     @Test
     public void testEstimateNumberOfReducers() throws Exception {
-        Assert.assertEquals(2, JobControlCompiler.estimateNumberOfReducers(CONF,
-                Lists.newArrayList(createPOLoadWithSize(2L * 1000 * 1000 * 999,
-                        new PigStorage())),
-                new org.apache.hadoop.mapreduce.Job(CONF)));
-
-        Assert.assertEquals(2, JobControlCompiler.estimateNumberOfReducers(CONF,
-                Lists.newArrayList(createPOLoadWithSize(2L * 1000 * 1000 * 1000,
-                        new PigStorage())),
-                new org.apache.hadoop.mapreduce.Job(CONF)));
-
-        Assert.assertEquals(3, JobControlCompiler.estimateNumberOfReducers(CONF,
-                Lists.newArrayList(createPOLoadWithSize(2L * 1000 * 1000 * 1001,
-                        new PigStorage())),
-                new org.apache.hadoop.mapreduce.Job(CONF)));
+        Assert.assertEquals(2, JobControlCompiler.estimateNumberOfReducers(
+            new Job(CONF), createMockPOLoadMapReduceOper(2L * 1000 * 1000 * 999)));
+
+        Assert.assertEquals(2, JobControlCompiler.estimateNumberOfReducers(
+            new Job(CONF), createMockPOLoadMapReduceOper(2L * 1000 * 1000 * 1000)));
+
+        Assert.assertEquals(3, JobControlCompiler.estimateNumberOfReducers(
+            new Job(CONF), createMockPOLoadMapReduceOper(2L * 1000 * 1000 * 1001)));
+    }
+
+    private static MapReduceOper createMockPOLoadMapReduceOper(long size) throws Exception {
+        MapReduceOper mro = new MapReduceOper(new OperatorKey());
+        mro.mapPlan.add(createPOLoadWithSize(size, new PigStorage()));
+        return mro;
     }
 
     public static POLoad createPOLoadWithSize(long size, LoadFunc loadFunc) throws Exception {