You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2010/07/29 02:24:38 UTC
svn commit: r980274 - in /hadoop/pig/trunk: CHANGES.txt
conf/pig-default.properties conf/pig.properties
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
test/org/apache/pig/test/TestJobSubmission.java
Author: olga
Date: Thu Jul 29 00:24:37 2010
New Revision: 980274
URL: http://svn.apache.org/viewvc?rev=980274&view=rev
Log:
PIG-1249: Safe-guards against misconfigured Pig scripts without PARALLEL keyword (zjffdu vi olgan)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/conf/pig-default.properties
hadoop/pig/trunk/conf/pig.properties
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=980274&r1=980273&r2=980274&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Jul 29 00:24:37 2010
@@ -22,6 +22,8 @@ Trunk (unreleased changes)
INCOMPATIBLE CHANGES
+PIG-1249: Safe-guards against misconfigured Pig scripts without PARALLEL keyword (zjffdu vi olgan)
+
IMPROVEMENTS
PIG-928: UDFs in scripting languages (daijy)
Modified: hadoop/pig/trunk/conf/pig-default.properties
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/conf/pig-default.properties?rev=980274&r1=980273&r2=980274&view=diff
==============================================================================
--- hadoop/pig/trunk/conf/pig-default.properties (original)
+++ hadoop/pig/trunk/conf/pig-default.properties Thu Jul 29 00:24:37 2010
@@ -21,3 +21,7 @@ pig.spill.size.threshold=5000000
#EXPERIMENT: Activate garbage collection when spilling a file bigger than this size (bytes)
#This should help reduce the number of files being spilled.
pig.spill.gc.activation.size=40000000
+
+#the following two parameters are to help estimate the reducer number
+pig.exec.reducers.bytes.per.reducer=1000000000
+pig.exec.reducers.max=999
\ No newline at end of file
Modified: hadoop/pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/conf/pig.properties?rev=980274&r1=980273&r2=980274&view=diff
==============================================================================
--- hadoop/pig/trunk/conf/pig.properties (original)
+++ hadoop/pig/trunk/conf/pig.properties Thu Jul 29 00:24:37 2010
@@ -22,3 +22,7 @@
#EXPERIMENT: Activate garbage collection when spilling a file bigger than this size (bytes)
#This should help reduce the number of files being spilled.
#pig.spill.gc.activation.size=40000000
+
+#the following two parameters are to help estimate the reducer number
+#pig.exec.reducers.bytes.per.reducer=1000000000
+#pig.exec.reducers.max=999
\ No newline at end of file
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=980274&r1=980273&r2=980274&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Jul 29 00:24:37 2010
@@ -339,10 +339,6 @@ public class JobControlCompiler{
ss.addSettingsToConf(mro, conf);
}
- //Set the User Name for this job. This will be
- //used as the working directory
- if (pigContext.defaultParallel > 0)
- conf.set("mapred.reduce.tasks", ""+pigContext.defaultParallel);
conf.set("mapred.mapper.new-api", "true");
conf.set("mapred.reducer.new-api", "true");
@@ -533,8 +529,15 @@ public class JobControlCompiler{
mro.reducePlan.remove(pack);
nwJob.setMapperClass(PigMapReduce.Map.class);
nwJob.setReducerClass(PigMapReduce.Reduce.class);
- if (mro.requestedParallelism>0)
+
+ // first check the PARALLE in query, then check the defaultParallel in PigContext, and last do estimation
+ if (mro.requestedParallelism > 0)
nwJob.setNumReduceTasks(mro.requestedParallelism);
+ else if (pigContext.defaultParallel > 0)
+ conf.set("mapred.reduce.tasks", ""+pigContext.defaultParallel);
+ else
+ estimateNumberOfReducers(conf,lds);
+
if (mro.customPartitioner != null)
nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
@@ -642,6 +645,75 @@ public class JobControlCompiler{
}
}
+ /**
+ * Currently the estimation of reducer number is only applied to HDFS, The estimation is based on the input size of data storage on HDFS.
+ * Two parameters can been configured for the estimation, one is pig.exec.reducers.max which constrain the maximum number of reducer task (default is 999). The other
+ * is pig.exec.reducers.bytes.per.reducer(default value is 1000*1000*1000) which means the how much data can been handled for each reducer.
+ * e.g. the following is your pig script
+ * a = load '/data/a';
+ * b = load '/data/b';
+ * c = join a by $0, b by $0;
+ * store c into '/tmp';
+ *
+ * The size of /data/a is 1000*1000*1000, and size of /data/b is 2*1000*1000*1000.
+ * Then the estimated reducer number is (1000*1000*1000+2*1000*1000*1000)/(1000*1000*1000)=3
+ * @param conf
+ * @param lds
+ * @throws IOException
+ */
+ private void estimateNumberOfReducers(Configuration conf, List<POLoad> lds) throws IOException {
+ long bytesPerReducer = conf.getLong("pig.exec.reducers.bytes.per.reducer", (long) (1000 * 1000 * 1000));
+ int maxReducers = conf.getInt("pig.exec.reducers.max", 999);
+ long totalInputFileSize = getTotalInputFileSize(lds);
+
+ log.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+ + maxReducers + " totalInputFileSize=" + totalInputFileSize);
+
+ int reducers = (int)Math.ceil((totalInputFileSize+0.0) / bytesPerReducer);
+ reducers = Math.max(1, reducers);
+ reducers = Math.min(maxReducers, reducers);
+ conf.setInt("mapred.reduce.tasks", reducers);
+
+ log.info("Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to " + reducers);
+ }
+
+ private long getTotalInputFileSize(List<POLoad> lds) throws IOException {
+ List<String> inputs = new ArrayList<String>();
+ if(lds!=null && lds.size()>0){
+ for (POLoad ld : lds) {
+ inputs.add(ld.getLFile().getFileName());
+ }
+ }
+ long size = 0;
+ FileSystem fs = FileSystem.get(conf);
+ for (String input : inputs){
+ Path path = new Path(input);
+ String schema = path.toUri().getScheme();
+ if (schema==null || schema.equalsIgnoreCase("hdfs") || schema.equalsIgnoreCase("file")){
+ FileStatus[] status=fs.globStatus(new Path(input));
+ if (status != null){
+ for (FileStatus s : status){
+ size += getPathLength(fs, s);
+ }
+ }
+ }
+ }
+ return size;
+ }
+
+ private long getPathLength(FileSystem fs,FileStatus status) throws IOException{
+ if (!status.isDir()){
+ return status.getLen();
+ }else{
+ FileStatus[] children = fs.listStatus(status.getPath());
+ long size=0;
+ for (FileStatus child : children){
+ size +=getPathLength(fs, child);
+ }
+ return size;
+ }
+ }
+
public static class PigSecondaryKeyGroupComparator extends WritableComparator {
public PigSecondaryKeyGroupComparator() {
// super(TupleFactory.getInstance().tupleClass(), true);
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java?rev=980274&r1=980273&r2=980274&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Thu Jul 29 00:24:37 2010
@@ -545,6 +545,68 @@ public class TestJobSubmission extends j
pc.defaultParallel = -1;
}
+ @Test
+ public void testReducerNumEstimation() throws Exception{
+ // use the estimation
+ LogicalPlanTester planTester = new LogicalPlanTester(pc) ;
+ Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd", "/passwd");
+ planTester.buildPlan("a = load '/passwd';");
+ LogicalPlan lp = planTester.buildPlan("b = group a by $0;");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ POStore store = GenPhyOp.dummyPigStorageOp();
+ pp.addAsLeaf(store);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
+ pc.getConf().setProperty("pig.exec.reducers.max", "10");
+ HExecutionEngine exe = pc.getExecutionEngine();
+ ConfigurationValidator.validatePigProperties(exe.getConfiguration());
+ Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
+ JobControlCompiler jcc = new JobControlCompiler(pc, conf);
+ JobControl jc=jcc.compile(mrPlan, "Test");
+ Job job = jc.getWaitingJobs().get(0);
+ long reducer=Math.min((long)Math.ceil(new File("test/org/apache/pig/test/data/passwd").length()/100.0), 10);
+ assertEquals(job.getJobConf().getLong("mapred.reduce.tasks",10), reducer);
+
+ // use the PARALLEL key word, it will override the estimated reducer number
+ planTester = new LogicalPlanTester(pc) ;
+ planTester.buildPlan("a = load '/passwd';");
+ lp = planTester.buildPlan("b = group a by $0 PARALLEL 2;");
+ pp = Util.buildPhysicalPlan(lp, pc);
+ store = GenPhyOp.dummyPigStorageOp();
+ pp.addAsLeaf(store);
+ mrPlan = Util.buildMRPlan(pp, pc);
+
+ pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
+ pc.getConf().setProperty("pig.exec.reducers.max", "10");
+ exe = pc.getExecutionEngine();
+ ConfigurationValidator.validatePigProperties(exe.getConfiguration());
+ conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
+ jcc = new JobControlCompiler(pc, conf);
+ jc=jcc.compile(mrPlan, "Test");
+ job = jc.getWaitingJobs().get(0);
+ assertEquals(job.getJobConf().getLong("mapred.reduce.tasks",10), 2);
+
+ // the estimation won't take effect when it apply to non-dfs or the files doesn't exist, such as hbase
+ planTester = new LogicalPlanTester(pc) ;
+ planTester.buildPlan("a = load 'hbase://passwd' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('c:f1 c:f2');");
+ lp = planTester.buildPlan("b = group a by $0 ;");
+ pp = Util.buildPhysicalPlan(lp, pc);
+ store = GenPhyOp.dummyPigStorageOp();
+ pp.addAsLeaf(store);
+ mrPlan = Util.buildMRPlan(pp, pc);
+
+ pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
+ pc.getConf().setProperty("pig.exec.reducers.max", "10");
+ exe = pc.getExecutionEngine();
+ ConfigurationValidator.validatePigProperties(exe.getConfiguration());
+ conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
+ jcc = new JobControlCompiler(pc, conf);
+ jc=jcc.compile(mrPlan, "Test");
+ job = jc.getWaitingJobs().get(0);
+ assertEquals(job.getJobConf().getLong("mapred.reduce.tasks",10), 1);
+ }
+
private void submit() throws Exception{
assertEquals(true, FileLocalizer.fileExists(hadoopLdFile, pc));
MapReduceLauncher mrl = new MapReduceLauncher();