You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by bi...@apache.org on 2012/04/16 07:47:15 UTC

svn commit: r1326492 - in /pig/trunk: ./ conf/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/impl/ test/org/apache/pig/test/

Author: billgraham
Date: Mon Apr 16 05:47:14 2012
New Revision: 1326492

URL: http://svn.apache.org/viewvc?rev=1326492&view=rev
Log:
PIG-2574 Make reducer estimator plugable

Modified:
    pig/trunk/.gitignore
    pig/trunk/conf/pig.properties
    pig/trunk/src/org/apache/pig/Main.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/impl/PigContext.java
    pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java

Modified: pig/trunk/.gitignore
URL: http://svn.apache.org/viewvc/pig/trunk/.gitignore?rev=1326492&r1=1326491&r2=1326492&view=diff
==============================================================================
--- pig/trunk/.gitignore (original)
+++ pig/trunk/.gitignore Mon Apr 16 05:47:14 2012
@@ -2,6 +2,10 @@
 build/
 src-gen/
 test/org/apache/pig/test/utils/dotGraph/parser/
+target/
 ivy/*.jar
 pig.jar
 pig-withouthadoop.jar
+*.iml
+*.ipr
+*.iws

Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1326492&r1=1326491&r2=1326492&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Mon Apr 16 05:47:14 2012
@@ -65,11 +65,26 @@ hcat.bin=/usr/local/hcat/bin/hcat
 ##### Set up optional Pig Progress Notification Listener ############
 
 # Note that only one PPNL can be set up. If you need several, write a PPNL that will chain them.
-# pig.notification.listener = <fully qualified class name o a PPNL implementation>
+# pig.notification.listener = <fully qualified class name of a PPNL implementation>
 
 # Optionally, you can supply a single String argument to pass to your PPNL. 
 # pig.notification.listener.arg = <somevalue>
 
 #####################################################################
 
+########## Override the default Reducer Estimator logic #############
+
+# By default, the logic to estimate the number of reducers to use for a given job lives in:
+#   org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator
+# This logic can be relaced by implementing the following interface:
+#   org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigReducerEstimator
+
+# This class will be invoked to estimate the number of reducers to use.
+# pig.exec.reducer.estimator = <fully qualified class name of a PigReducerEstimator implementation>
+
+# Optionally, you can supply a single String argument to pass to your PigReducerEstimator.
+# pig.exec.reducer.estimator.arg = <somevalue>
+
+#####################################################################
+
 #pig.load.default.statements=

Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1326492&r1=1326491&r2=1326492&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Mon Apr 16 05:47:14 2012
@@ -56,6 +56,7 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.pig.PigRunner.ReturnCode;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
@@ -644,18 +645,15 @@ static int run(String args[], PigProgres
 }
 
 protected static PigProgressNotificationListener makeListener(Properties properties) {
-    String className = properties.getProperty(PROGRESS_NOTIFICATION_LISTENER_KEY);
-    if (className != null) {
-        FuncSpec fs = null;
-        if (properties.containsKey(PROGRESS_NOTIFICATION_LISTENER_ARG_KEY)) {
-            fs = new FuncSpec(className,
-                    properties.getProperty(PROGRESS_NOTIFICATION_LISTENER_ARG_KEY));
-        } else {
-            fs = new FuncSpec(className);
-        }
-        return (PigProgressNotificationListener) PigContext.instantiateFuncFromSpec(fs);
-    } else {
-        return null;
+
+    try {
+        return PigContext.instantiateObjectFromParams(
+                    ConfigurationUtil.toConfiguration(properties),
+                    PROGRESS_NOTIFICATION_LISTENER_KEY,
+                    PROGRESS_NOTIFICATION_LISTENER_ARG_KEY,
+                    PigProgressNotificationListener.class);
+    } catch (ExecException e) {
+        throw new RuntimeException(e);
     }
 }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1326492&r1=1326491&r2=1326492&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Apr 16 05:47:14 2012
@@ -49,9 +49,7 @@ import org.apache.hadoop.mapred.jobcontr
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.LoadFunc;
-import org.apache.pig.LoadMetadata;
 import org.apache.pig.PigException;
-import org.apache.pig.ResourceStatistics;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
@@ -94,7 +92,6 @@ import org.apache.pig.impl.util.JarManag
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.impl.util.UriUtil;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.ScriptState;
 
@@ -133,6 +130,9 @@ public class JobControlCompiler{
     public static final String LOG_DIR = "_logs";
 
     public static final String END_OF_INP_IN_MAP = "pig.invoke.close.in.map";
+
+    private static final String REDUCER_ESTIMATOR_KEY = "pig.exec.reducer.estimator";
+    private static final String REDUCER_ESTIMATOR_ARG_KEY =  "pig.exec.reducer.estimator.arg";
     
     /**
      * We will serialize the POStore(s) present in map and reduce in lists in
@@ -744,111 +744,26 @@ public class JobControlCompiler{
     }
 
     /**
-     * Estimate the number of reducers based on input size.
-     * Number of reducers is based on two properties:
-     * <ul>
-     *     <li>pig.exec.reducers.bytes.per.reducer -
-     *     how many bytes of input per reducer (default is 1000*1000*1000)</li>
-     *     <li>pig.exec.reducers.max -
-     *     constrain the maximum number of reducer task (default is 999)</li>
-     * </ul>
-     * If using a loader that implements LoadMetadata the reported input size is used, otherwise
-     * attempt to determine size from the filesystem.
-     * <p>
-     * e.g. the following is your pig script
-     * <pre>
-     * a = load '/data/a';
-     * b = load '/data/b';
-     * c = join a by $0, b by $0;
-     * store c into '/tmp';
-     * </pre>
-     * The size of /data/a is 1000*1000*1000, and size of /data/b is 2*1000*1000*1000.
-     * Then the estimated reducer number is (1000*1000*1000+2*1000*1000*1000)/(1000*1000*1000)=3
-     * </p>
-     *
-     * @param conf read settings from this configuration
-     * @param lds inputs to estimate number of reducers for
-     * @param job job configuration
-     * @throws IOException on error
-     * @return estimated number of reducers necessary for this input
+     * Looks up the estimator from REDUCER_ESTIMATOR_KEY and invokes it to find the number of
+     * reducers to use. If REDUCER_ESTIMATOR_KEY isn't set, defaults to InputSizeReducerEstimator
+     * @param conf
+     * @param lds
+     * @throws IOException
      */
     public static int estimateNumberOfReducers(Configuration conf, List<POLoad> lds,
-            org.apache.hadoop.mapreduce.Job job) throws IOException {
-        long bytesPerReducer =
-                conf.getLong("pig.exec.reducers.bytes.per.reducer", (1000 * 1000 * 1000));
-        int maxReducers = conf.getInt("pig.exec.reducers.max", 999);
-
-        long totalInputFileSize = getInputSize(conf, lds, job);
-
-        log.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
-            + maxReducers + " totalInputFileSize=" + totalInputFileSize);
-        
-        int reducers = (int)Math.ceil((totalInputFileSize+0.0) / bytesPerReducer);
-        reducers = Math.max(1, reducers);
-        reducers = Math.min(maxReducers, reducers);
-        conf.setInt("mapred.reduce.tasks", reducers);
-
-        log.info("Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to " + reducers);
-        return reducers;
-    }
-
-    /**
-     * Get the input size for as many inputs as possible. Inputs that do not report
-     * their size nor can pig look that up itself are excluded from this size.
-     */
-    public static long getInputSize(Configuration conf, List<POLoad> lds,
-            org.apache.hadoop.mapreduce.Job job) throws IOException {
-        long totalInputFileSize = 0;
-        for (POLoad ld : lds) {
-            long size = getInputSizeFromLoader(ld, job);
-            if (size > 0) {
-                totalInputFileSize += size;
-                continue;
-            }
-            // the input file location might be a list of comma separated files,
-            // separate them out
-            for (String location : LoadFunc.getPathStrings(ld.getLFile().getFileName())) {
-                if (UriUtil.isHDFSFileOrLocalOrS3N(location)) {
-                    Path path = new Path(location);
-                    FileSystem fs = path.getFileSystem(conf);
-                    FileStatus[] status = fs.globStatus(path);
-                    if (status != null) {
-                        for (FileStatus s : status) {
-                            totalInputFileSize += Utils.getPathLength(fs, s);
-                        }
-                    }
-                }
-            }
-        }
-        return totalInputFileSize;
-    }
-
-    /**
-     * Get the total input size in bytes by looking at statistics provided by
-     * loaders that implement @{link LoadMetadata}.
-     * @param ld
-     * @param job
-     * @return total input size in bytes, or 0 if unknown or incomplete
-     * @throws IOException on error
-     */
-    public static long getInputSizeFromLoader(
-            POLoad ld, org.apache.hadoop.mapreduce.Job job) throws IOException {
-        if (ld.getLoadFunc() == null
-                || !(ld.getLoadFunc() instanceof LoadMetadata)
-                || ld.getLFile() == null
-                || ld.getLFile().getFileName() == null) {
-            return 0;
-        }
-
-        ResourceStatistics statistics =
-                ((LoadMetadata) ld.getLoadFunc())
-                        .getStatistics(ld.getLFile().getFileName(), job);
-
-        if (statistics == null || statistics.getSizeInBytes() == null) {
-            return 0;
-        }
-
-        return statistics.getSizeInBytes();
+                                        org.apache.hadoop.mapreduce.Job job) throws IOException {
+        PigReducerEstimator estimator = conf.get(REDUCER_ESTIMATOR_KEY) == null ?
+          new InputSizeReducerEstimator() :
+          PigContext.instantiateObjectFromParams(conf,
+                  REDUCER_ESTIMATOR_KEY, REDUCER_ESTIMATOR_ARG_KEY, PigReducerEstimator.class);
+
+        log.info("Using reducer estimator: " + estimator.getClass().getName());
+        int numberOfReducers = estimator.estimateNumberOfReducers(conf, lds, job);
+        conf.setInt("mapred.reduce.tasks", numberOfReducers);
+
+        log.info("Neither PARALLEL nor default parallelism is set for this job. Setting number of "
+                + "reducers to " + numberOfReducers);
+        return numberOfReducers;
     }
 
     public static class PigSecondaryKeyGroupComparator extends WritableComparator {

Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1326492&r1=1326491&r2=1326492&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Mon Apr 16 05:47:14 2012
@@ -38,6 +38,7 @@ import java.util.Vector;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Level;
 import org.apache.pig.ExecType;
@@ -498,8 +499,45 @@ public class PigContext implements Seria
         String msg = "Could not resolve " + name + " using imports: " + packageImportList.get();
         throw new ExecException(msg, errCode, PigException.INPUT);
     }
-    
-    
+
+    /**
+     * A common Pig pattern for initializing objects via system properties is to support passing
+     * something like this on the command line:
+     * <code>-Dpig.notification.listener=MyClass</code>
+     * <code>-Dpig.notification.listener.arg=myConstructorStringArg</code>
+     *
+     * This method will properly initialize the class with the args, if they exist.
+     * @param conf
+     * @param classParamKey the property used to identify the class
+     * @param argParamKey the property used to identify the class args
+     * @param clazz The class that is expected
+     * @return
+     */
+    public static <T> T instantiateObjectFromParams(Configuration conf,
+                                                    String classParamKey,
+                                                    String argParamKey,
+                                                    Class<T> clazz) throws ExecException {
+      String className = conf.get(classParamKey);
+
+      if (className != null) {
+          FuncSpec fs;
+          if (conf.get(argParamKey) != null) {
+              fs = new FuncSpec(className, conf.get(argParamKey));
+          } else {
+              fs = new FuncSpec(className);
+          }
+          try {
+            return clazz.cast(PigContext.instantiateFuncFromSpec(fs));
+          }
+          catch (ClassCastException e) {
+              throw new ExecException("The class defined by " + classParamKey +
+                      " in conf is not of type " + clazz.getName(), e);
+          }
+      } else {
+          return null;
+      }
+    }
+
     @SuppressWarnings({ "unchecked", "rawtypes" })
     public static Object instantiateFuncFromSpec(FuncSpec funcSpec)  {
         Object ret;

Modified: pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1326492&r1=1326491&r2=1326492&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java Mon Apr 16 05:47:14 2012
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.pig.test;
 
 import java.io.File;
@@ -133,35 +150,7 @@ public class TestJobControlCompiler {
                 new org.apache.hadoop.mapreduce.Job(CONF)));
     }
 
-    @Test
-    public void testGetInputSizeFromFs() throws Exception {
-        long size = 2L * 1024 * 1024 * 1024;
-        Assert.assertEquals(size, JobControlCompiler.getInputSize(
-                CONF, Lists.newArrayList(createPOLoadWithSize(size, new PigStorage())),
-                new org.apache.hadoop.mapreduce.Job(CONF)));
-
-        Assert.assertEquals(size, JobControlCompiler.getInputSize(
-                CONF,
-                Lists.newArrayList(createPOLoadWithSize(size, new PigStorageWithStatistics())),
-                new org.apache.hadoop.mapreduce.Job(CONF)));
-
-        Assert.assertEquals(size * 2, JobControlCompiler.getInputSize(
-                CONF,
-                Lists.newArrayList(
-                        createPOLoadWithSize(size, new PigStorage()),
-                        createPOLoadWithSize(size, new PigStorageWithStatistics())),
-                new org.apache.hadoop.mapreduce.Job(CONF)));
-    }
-
-    @Test
-    public void testGetInputSizeFromLoader() throws Exception {
-        long size = 2L * 1024 * 1024 * 1024;
-        Assert.assertEquals(size, JobControlCompiler.getInputSizeFromLoader(
-                createPOLoadWithSize(size, new PigStorageWithStatistics()),
-                new org.apache.hadoop.mapreduce.Job(CONF)));
-    }
-
-    private static POLoad createPOLoadWithSize(long size, LoadFunc loadFunc) throws Exception {
+    public static POLoad createPOLoadWithSize(long size, LoadFunc loadFunc) throws Exception {
         File file = File.createTempFile("tempFile", ".tmp");
         file.deleteOnExit();
         RandomAccessFile f = new RandomAccessFile(file, "rw");