You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2010/07/29 02:24:38 UTC

svn commit: r980274 - in /hadoop/pig/trunk: CHANGES.txt conf/pig-default.properties conf/pig.properties src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java test/org/apache/pig/test/TestJobSubmission.java

Author: olga
Date: Thu Jul 29 00:24:37 2010
New Revision: 980274

URL: http://svn.apache.org/viewvc?rev=980274&view=rev
Log:
PIG-1249: Safe-guards against misconfigured Pig scripts without PARALLEL keyword (zjffdu vi olgan)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/conf/pig-default.properties
    hadoop/pig/trunk/conf/pig.properties
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=980274&r1=980273&r2=980274&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Jul 29 00:24:37 2010
@@ -22,6 +22,8 @@ Trunk (unreleased changes)
 
 INCOMPATIBLE CHANGES
 
+PIG-1249: Safe-guards against misconfigured Pig scripts without PARALLEL keyword (zjffdu vi olgan)
+
 IMPROVEMENTS
 
 PIG-928: UDFs in scripting languages (daijy)

Modified: hadoop/pig/trunk/conf/pig-default.properties
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/conf/pig-default.properties?rev=980274&r1=980273&r2=980274&view=diff
==============================================================================
--- hadoop/pig/trunk/conf/pig-default.properties (original)
+++ hadoop/pig/trunk/conf/pig-default.properties Thu Jul 29 00:24:37 2010
@@ -21,3 +21,7 @@ pig.spill.size.threshold=5000000
 #EXPERIMENT: Activate garbage collection when spilling a file bigger than this size (bytes)
 #This should help reduce the number of files being spilled.
 pig.spill.gc.activation.size=40000000
+
+#the following two parameters are to help estimate the reducer number
+pig.exec.reducers.bytes.per.reducer=1000000000
+pig.exec.reducers.max=999
\ No newline at end of file

Modified: hadoop/pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/conf/pig.properties?rev=980274&r1=980273&r2=980274&view=diff
==============================================================================
--- hadoop/pig/trunk/conf/pig.properties (original)
+++ hadoop/pig/trunk/conf/pig.properties Thu Jul 29 00:24:37 2010
@@ -22,3 +22,7 @@
 #EXPERIMENT: Activate garbage collection when spilling a file bigger than this size (bytes)
 #This should help reduce the number of files being spilled.
 #pig.spill.gc.activation.size=40000000
+
+#the following two parameters are to help estimate the reducer number
+#pig.exec.reducers.bytes.per.reducer=1000000000
+#pig.exec.reducers.max=999
\ No newline at end of file

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=980274&r1=980273&r2=980274&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Jul 29 00:24:37 2010
@@ -339,10 +339,6 @@ public class JobControlCompiler{
             ss.addSettingsToConf(mro, conf);
         }
         
-        //Set the User Name for this job. This will be
-        //used as the working directory
-        if (pigContext.defaultParallel > 0)
-            conf.set("mapred.reduce.tasks", ""+pigContext.defaultParallel);
  
         conf.set("mapred.mapper.new-api", "true");
         conf.set("mapred.reducer.new-api", "true");
@@ -533,8 +529,15 @@ public class JobControlCompiler{
                 mro.reducePlan.remove(pack);
                 nwJob.setMapperClass(PigMapReduce.Map.class);
                 nwJob.setReducerClass(PigMapReduce.Reduce.class);
-                if (mro.requestedParallelism>0)
+                
+                // first check the PARALLE in query, then check the defaultParallel in PigContext, and last do estimation
+                if (mro.requestedParallelism > 0)
                     nwJob.setNumReduceTasks(mro.requestedParallelism);
+		else if (pigContext.defaultParallel > 0)
+                    conf.set("mapred.reduce.tasks", ""+pigContext.defaultParallel);
+                else
+                    estimateNumberOfReducers(conf,lds);
+                
                 if (mro.customPartitioner != null)
                 	nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
 
@@ -642,6 +645,75 @@ public class JobControlCompiler{
         }
     }
     
+    /**
+     * Currently the estimation of reducer number is only applied to HDFS, The estimation is based on the input size of data storage on HDFS.
+     * Two parameters can been configured for the estimation, one is pig.exec.reducers.max which constrain the maximum number of reducer task (default is 999). The other
+     * is pig.exec.reducers.bytes.per.reducer(default value is 1000*1000*1000) which means the how much data can been handled for each reducer.
+     * e.g. the following is your pig script
+     * a = load '/data/a';
+     * b = load '/data/b';
+     * c = join a by $0, b by $0;
+     * store c into '/tmp';
+     * 
+     * The size of /data/a is 1000*1000*1000, and size of /data/b is 2*1000*1000*1000.
+     * Then the estimated reducer number is (1000*1000*1000+2*1000*1000*1000)/(1000*1000*1000)=3
+     * @param conf
+     * @param lds
+     * @throws IOException
+     */
+    private void estimateNumberOfReducers(Configuration conf, List<POLoad> lds) throws IOException {
+           long bytesPerReducer = conf.getLong("pig.exec.reducers.bytes.per.reducer", (long) (1000 * 1000 * 1000));
+        int maxReducers = conf.getInt("pig.exec.reducers.max", 999);
+        long totalInputFileSize = getTotalInputFileSize(lds);
+       
+        log.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+            + maxReducers + " totalInputFileSize=" + totalInputFileSize);
+        
+        int reducers = (int)Math.ceil((totalInputFileSize+0.0) / bytesPerReducer);
+        reducers = Math.max(1, reducers);
+        reducers = Math.min(maxReducers, reducers);
+        conf.setInt("mapred.reduce.tasks", reducers);
+
+	log.info("Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to " + reducers);
+    }
+
+    private long getTotalInputFileSize(List<POLoad> lds) throws IOException {
+        List<String> inputs = new ArrayList<String>();
+        if(lds!=null && lds.size()>0){
+            for (POLoad ld : lds) {
+                inputs.add(ld.getLFile().getFileName());
+            }
+        }
+        long size = 0;
+        FileSystem fs = FileSystem.get(conf);
+        for (String input : inputs){
+            Path path = new Path(input);
+           String schema = path.toUri().getScheme();
+            if (schema==null || schema.equalsIgnoreCase("hdfs") || schema.equalsIgnoreCase("file")){
+                FileStatus[] status=fs.globStatus(new Path(input));
+                if (status != null){
+                    for (FileStatus s : status){
+                       size += getPathLength(fs, s);
+                    }
+                }
+            }
+        }
+        return size;
+   }
+   
+    private long getPathLength(FileSystem fs,FileStatus status) throws IOException{
+        if (!status.isDir()){
+            return status.getLen();
+        }else{
+            FileStatus[] children = fs.listStatus(status.getPath());
+            long size=0;
+            for (FileStatus child : children){
+                size +=getPathLength(fs, child);
+            }
+            return size;
+        }
+    }
+        
     public static class PigSecondaryKeyGroupComparator extends WritableComparator {
         public PigSecondaryKeyGroupComparator() {
 //            super(TupleFactory.getInstance().tupleClass(), true);

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java?rev=980274&r1=980273&r2=980274&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Thu Jul 29 00:24:37 2010
@@ -545,6 +545,68 @@ public class TestJobSubmission extends j
         pc.defaultParallel = -1;        
     }
 
+    @Test
+    public void testReducerNumEstimation() throws Exception{
+       // use the estimation
+        LogicalPlanTester planTester = new LogicalPlanTester(pc) ;
+        Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd", "/passwd");
+        planTester.buildPlan("a = load '/passwd';");
+        LogicalPlan lp = planTester.buildPlan("b = group a by $0;");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        POStore store = GenPhyOp.dummyPigStorageOp();
+        pp.addAsLeaf(store);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+               
+        pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
+        pc.getConf().setProperty("pig.exec.reducers.max", "10");
+        HExecutionEngine exe = pc.getExecutionEngine();
+        ConfigurationValidator.validatePigProperties(exe.getConfiguration());
+        Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
+        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
+        JobControl jc=jcc.compile(mrPlan, "Test");
+        Job job = jc.getWaitingJobs().get(0);
+        long reducer=Math.min((long)Math.ceil(new File("test/org/apache/pig/test/data/passwd").length()/100.0), 10);
+        assertEquals(job.getJobConf().getLong("mapred.reduce.tasks",10), reducer);
+        
+        // use the PARALLEL key word, it will override the estimated reducer number
+        planTester = new LogicalPlanTester(pc) ;
+        planTester.buildPlan("a = load '/passwd';");
+        lp = planTester.buildPlan("b = group a by $0 PARALLEL 2;");
+        pp = Util.buildPhysicalPlan(lp, pc);
+        store = GenPhyOp.dummyPigStorageOp();
+        pp.addAsLeaf(store);
+        mrPlan = Util.buildMRPlan(pp, pc);
+               
+        pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
+        pc.getConf().setProperty("pig.exec.reducers.max", "10");
+        exe = pc.getExecutionEngine();
+        ConfigurationValidator.validatePigProperties(exe.getConfiguration());
+        conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
+        jcc = new JobControlCompiler(pc, conf);
+        jc=jcc.compile(mrPlan, "Test");
+        job = jc.getWaitingJobs().get(0);
+        assertEquals(job.getJobConf().getLong("mapred.reduce.tasks",10), 2);
+        
+        // the estimation won't take effect when it apply to non-dfs or the files doesn't exist, such as hbase
+        planTester = new LogicalPlanTester(pc) ;
+        planTester.buildPlan("a = load 'hbase://passwd' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('c:f1 c:f2');");
+        lp = planTester.buildPlan("b = group a by $0 ;");
+        pp = Util.buildPhysicalPlan(lp, pc);
+        store = GenPhyOp.dummyPigStorageOp();
+        pp.addAsLeaf(store);
+        mrPlan = Util.buildMRPlan(pp, pc);
+                
+        pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
+        pc.getConf().setProperty("pig.exec.reducers.max", "10");
+        exe = pc.getExecutionEngine();
+        ConfigurationValidator.validatePigProperties(exe.getConfiguration());
+        conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
+        jcc = new JobControlCompiler(pc, conf);
+        jc=jcc.compile(mrPlan, "Test");
+        job = jc.getWaitingJobs().get(0);
+        assertEquals(job.getJobConf().getLong("mapred.reduce.tasks",10), 1);
+    }
+    
     private void submit() throws Exception{
         assertEquals(true, FileLocalizer.fileExists(hadoopLdFile, pc));
         MapReduceLauncher mrl = new MapReduceLauncher();