You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by bi...@apache.org on 2012/08/14 02:08:37 UTC
svn commit: r1372679 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
test/org/apache/pig/test/
Author: billgraham
Date: Tue Aug 14 00:08:37 2012
New Revision: 1372679
URL: http://svn.apache.org/viewvc?rev=1372679&view=rev
Log:
PIG-2871: Refactor signature for PigReducerEstimator (billgraham)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java
pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1372679&r1=1372678&r2=1372679&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Aug 14 00:08:37 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2871: Refactor signature for PigReducerEstimator (billgraham)
+
PIG-2851: Add flag to ant to run tests with a debugger port (billgraham)
PIG-2862: Hardcode certain tuple lengths into the TUPLE BinInterSedes byte identifier (jcoveney)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1372679&r1=1372678&r2=1372679&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java Tue Aug 14 00:08:37 2012
@@ -28,6 +28,7 @@ import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.impl.util.UriUtil;
import org.apache.pig.impl.util.Utils;
@@ -64,16 +65,19 @@ public class InputSizeReducerEstimator i
/**
* Determines the number of reducers to be used.
*
- * @param conf the job configuration
- * @param lds list of POLoads used in the jobs physical plan
* @param job job instance
+ * @param mapReduceOper
* @throws java.io.IOException
*/
@Override
- public int estimateNumberOfReducers(Configuration conf, List<POLoad> lds, Job job) throws IOException {
+ public int estimateNumberOfReducers(Job job, MapReduceOper mapReduceOper) throws IOException {
+ Configuration conf = job.getConfiguration();
+
long bytesPerReducer = conf.getLong(BYTES_PER_REDUCER_PARAM, DEFAULT_BYTES_PER_REDUCER);
int maxReducers = conf.getInt(MAX_REDUCER_COUNT_PARAM, DEFAULT_MAX_REDUCER_COUNT_PARAM);
- long totalInputFileSize = getTotalInputFileSize(conf, lds, job);
+
+ List<POLoad> poLoads = PlanHelper.getPhysicalOperators(mapReduceOper.mapPlan, POLoad.class);
+ long totalInputFileSize = getTotalInputFileSize(conf, poLoads, job);
log.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+ maxReducers + " totalInputFileSize=" + totalInputFileSize);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1372679&r1=1372678&r2=1372679&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Aug 14 00:08:37 2012
@@ -407,7 +407,7 @@ public class JobControlCompiler{
}
}
- adjustNumReducers(plan, mro, conf, nwJob);
+ adjustNumReducers(plan, mro, nwJob);
if(lds!=null && lds.size()>0){
for (POLoad ld : lds) {
@@ -763,13 +763,12 @@ public class JobControlCompiler{
* the number of partitions used in the sampler.
* @param plan the MR plan
* @param mro the MR operator
- * @param conf the configuration
* @param nwJob the current job
* @throws IOException
*/
- public void adjustNumReducers(MROperPlan plan, MapReduceOper mro, Configuration conf,
+ public void adjustNumReducers(MROperPlan plan, MapReduceOper mro,
org.apache.hadoop.mapreduce.Job nwJob) throws IOException {
- int jobParallelism = calculateRuntimeReducers(mro, conf, nwJob);
+ int jobParallelism = calculateRuntimeReducers(mro, nwJob);
if (mro.isSampler()) {
// We need to calculate the final number of reducers of the next job (order-by or skew-join)
@@ -778,7 +777,7 @@ public class JobControlCompiler{
// Here we use the same conf and Job to calculate the runtime #reducers of the next job
// which is fine as the statistics comes from the nextMro's POLoads
- int nPartitions = calculateRuntimeReducers(nextMro, conf, nwJob);
+ int nPartitions = calculateRuntimeReducers(nextMro, nwJob);
// set the runtime #reducer of the next job as the #partition
ParallelConstantVisitor visitor =
@@ -787,6 +786,8 @@ public class JobControlCompiler{
}
log.info("Setting Parallelism to " + jobParallelism);
+ Configuration conf = nwJob.getConfiguration();
+
// set various parallelism into the job conf for later analysis, PIG-2779
conf.setInt("pig.info.reducers.default.parallel", pigContext.defaultParallel);
conf.setInt("pig.info.reducers.requested.parallel", mro.requestedParallelism);
@@ -805,13 +806,12 @@ public class JobControlCompiler{
* @return the runtimeParallelism
* @throws IOException
*/
- private int calculateRuntimeReducers(MapReduceOper mro, Configuration conf,
+ private int calculateRuntimeReducers(MapReduceOper mro,
org.apache.hadoop.mapreduce.Job nwJob) throws IOException{
// we don't recalculate for the same job
if (mro.runtimeParallelism != -1) {
return mro.runtimeParallelism;
}
- List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class);
int jobParallelism = -1;
@@ -820,7 +820,7 @@ public class JobControlCompiler{
} else if (pigContext.defaultParallel > 0) {
jobParallelism = pigContext.defaultParallel;
} else {
- mro.estimatedParallelism = estimateNumberOfReducers(conf, lds, nwJob);
+ mro.estimatedParallelism = estimateNumberOfReducers(nwJob, mro);
// reducer estimation could return -1 if it couldn't estimate
log.info("Could not estimate number of reducers and no requested or default " +
"parallelism set. Defaulting to 1 reducer.");
@@ -831,22 +831,25 @@ public class JobControlCompiler{
mro.runtimeParallelism = jobParallelism;
return jobParallelism;
}
+
/**
* Looks up the estimator from REDUCER_ESTIMATOR_KEY and invokes it to find the number of
* reducers to use. If REDUCER_ESTIMATOR_KEY isn't set, defaults to InputSizeReducerEstimator.
- * @param conf
- * @param lds
+ * @param job
+ * @param mapReducerOper
* @throws IOException
*/
- public static int estimateNumberOfReducers(Configuration conf, List<POLoad> lds,
- org.apache.hadoop.mapreduce.Job job) throws IOException {
+ public static int estimateNumberOfReducers(org.apache.hadoop.mapreduce.Job job,
+ MapReduceOper mapReducerOper) throws IOException {
+ Configuration conf = job.getConfiguration();
+
PigReducerEstimator estimator = conf.get(REDUCER_ESTIMATOR_KEY) == null ?
new InputSizeReducerEstimator() :
PigContext.instantiateObjectFromParams(conf,
REDUCER_ESTIMATOR_KEY, REDUCER_ESTIMATOR_ARG_KEY, PigReducerEstimator.class);
log.info("Using reducer estimator: " + estimator.getClass().getName());
- int numberOfReducers = estimator.estimateNumberOfReducers(conf, lds, job);
+ int numberOfReducers = estimator.estimateNumberOfReducers(job, mapReducerOper);
return numberOfReducers;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java?rev=1372679&r1=1372678&r2=1372679&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java Tue Aug 14 00:08:37 2012
@@ -17,14 +17,11 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import java.io.IOException;
-import java.util.List;
/**
* Interface to implement when you want to use a custom approach to estimating
@@ -46,12 +43,10 @@ public interface PigReducerEstimator {
* Estimate the number of reducers for a given job based on the collection
* of load funcs passed.
*
- * @param conf the job configuration
- * @param poLoadList list of POLoads used in the jobs physical plan
* @param job job instance
+ * @param mapReduceOper map reducer operator of the job
* @return the number of reducers to use, or -1 if the count couldn't be estimated
* @throws IOException
*/
- public int estimateNumberOfReducers(Configuration conf, List<POLoad> poLoadList, Job job)
- throws IOException;
+ public int estimateNumberOfReducers(Job job, MapReduceOper mapReduceOper) throws IOException;
}
Modified: pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1372679&r1=1372678&r2=1372679&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java Tue Aug 14 00:08:37 2012
@@ -42,12 +42,12 @@ import javax.tools.JavaFileObject;
import javax.tools.StandardJavaFileManager;
import javax.tools.ToolProvider;
-import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.LoadFunc;
@@ -134,20 +134,20 @@ public class TestJobControlCompiler {
@Test
public void testEstimateNumberOfReducers() throws Exception {
- Assert.assertEquals(2, JobControlCompiler.estimateNumberOfReducers(CONF,
- Lists.newArrayList(createPOLoadWithSize(2L * 1000 * 1000 * 999,
- new PigStorage())),
- new org.apache.hadoop.mapreduce.Job(CONF)));
-
- Assert.assertEquals(2, JobControlCompiler.estimateNumberOfReducers(CONF,
- Lists.newArrayList(createPOLoadWithSize(2L * 1000 * 1000 * 1000,
- new PigStorage())),
- new org.apache.hadoop.mapreduce.Job(CONF)));
-
- Assert.assertEquals(3, JobControlCompiler.estimateNumberOfReducers(CONF,
- Lists.newArrayList(createPOLoadWithSize(2L * 1000 * 1000 * 1001,
- new PigStorage())),
- new org.apache.hadoop.mapreduce.Job(CONF)));
+ Assert.assertEquals(2, JobControlCompiler.estimateNumberOfReducers(
+ new Job(CONF), createMockPOLoadMapReduceOper(2L * 1000 * 1000 * 999)));
+
+ Assert.assertEquals(2, JobControlCompiler.estimateNumberOfReducers(
+ new Job(CONF), createMockPOLoadMapReduceOper(2L * 1000 * 1000 * 1000)));
+
+ Assert.assertEquals(3, JobControlCompiler.estimateNumberOfReducers(
+ new Job(CONF), createMockPOLoadMapReduceOper(2L * 1000 * 1000 * 1001)));
+ }
+
+ private static MapReduceOper createMockPOLoadMapReduceOper(long size) throws Exception {
+ MapReduceOper mro = new MapReduceOper(new OperatorKey());
+ mro.mapPlan.add(createPOLoadWithSize(size, new PigStorage()));
+ return mro;
}
public static POLoad createPOLoadWithSize(long size, LoadFunc loadFunc) throws Exception {