You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by bi...@apache.org on 2012/04/16 07:47:15 UTC
svn commit: r1326492 - in /pig/trunk: ./ conf/ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/impl/ test/org/apache/pig/test/
Author: billgraham
Date: Mon Apr 16 05:47:14 2012
New Revision: 1326492
URL: http://svn.apache.org/viewvc?rev=1326492&view=rev
Log:
PIG-2574 Make reducer estimator plugable
Modified:
pig/trunk/.gitignore
pig/trunk/conf/pig.properties
pig/trunk/src/org/apache/pig/Main.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/impl/PigContext.java
pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
Modified: pig/trunk/.gitignore
URL: http://svn.apache.org/viewvc/pig/trunk/.gitignore?rev=1326492&r1=1326491&r2=1326492&view=diff
==============================================================================
--- pig/trunk/.gitignore (original)
+++ pig/trunk/.gitignore Mon Apr 16 05:47:14 2012
@@ -2,6 +2,10 @@
build/
src-gen/
test/org/apache/pig/test/utils/dotGraph/parser/
+target/
ivy/*.jar
pig.jar
pig-withouthadoop.jar
+*.iml
+*.ipr
+*.iws
Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1326492&r1=1326491&r2=1326492&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Mon Apr 16 05:47:14 2012
@@ -65,11 +65,26 @@ hcat.bin=/usr/local/hcat/bin/hcat
##### Set up optional Pig Progress Notification Listener ############
# Note that only one PPNL can be set up. If you need several, write a PPNL that will chain them.
-# pig.notification.listener = <fully qualified class name o a PPNL implementation>
+# pig.notification.listener = <fully qualified class name of a PPNL implementation>
# Optionally, you can supply a single String argument to pass to your PPNL.
# pig.notification.listener.arg = <somevalue>
#####################################################################
+########## Override the default Reducer Estimator logic #############
+
+# By default, the logic to estimate the number of reducers to use for a given job lives in:
+# org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator
+# This logic can be relaced by implementing the following interface:
+# org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigReducerEstimator
+
+# This class will be invoked to estimate the number of reducers to use.
+# pig.exec.reducer.estimator = <fully qualified class name of a PigReducerEstimator implementation>
+
+# Optionally, you can supply a single String argument to pass to your PigReducerEstimator.
+# pig.exec.reducer.estimator.arg = <somevalue>
+
+#####################################################################
+
#pig.load.default.statements=
Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1326492&r1=1326491&r2=1326492&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Mon Apr 16 05:47:14 2012
@@ -56,6 +56,7 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.apache.pig.PigRunner.ReturnCode;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
@@ -644,18 +645,15 @@ static int run(String args[], PigProgres
}
protected static PigProgressNotificationListener makeListener(Properties properties) {
- String className = properties.getProperty(PROGRESS_NOTIFICATION_LISTENER_KEY);
- if (className != null) {
- FuncSpec fs = null;
- if (properties.containsKey(PROGRESS_NOTIFICATION_LISTENER_ARG_KEY)) {
- fs = new FuncSpec(className,
- properties.getProperty(PROGRESS_NOTIFICATION_LISTENER_ARG_KEY));
- } else {
- fs = new FuncSpec(className);
- }
- return (PigProgressNotificationListener) PigContext.instantiateFuncFromSpec(fs);
- } else {
- return null;
+
+ try {
+ return PigContext.instantiateObjectFromParams(
+ ConfigurationUtil.toConfiguration(properties),
+ PROGRESS_NOTIFICATION_LISTENER_KEY,
+ PROGRESS_NOTIFICATION_LISTENER_ARG_KEY,
+ PigProgressNotificationListener.class);
+ } catch (ExecException e) {
+ throw new RuntimeException(e);
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1326492&r1=1326491&r2=1326492&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Apr 16 05:47:14 2012
@@ -49,9 +49,7 @@ import org.apache.hadoop.mapred.jobcontr
import org.apache.pig.ComparisonFunc;
import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
-import org.apache.pig.LoadMetadata;
import org.apache.pig.PigException;
-import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
@@ -94,7 +92,6 @@ import org.apache.pig.impl.util.JarManag
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.impl.util.UriUtil;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.ScriptState;
@@ -133,6 +130,9 @@ public class JobControlCompiler{
public static final String LOG_DIR = "_logs";
public static final String END_OF_INP_IN_MAP = "pig.invoke.close.in.map";
+
+ private static final String REDUCER_ESTIMATOR_KEY = "pig.exec.reducer.estimator";
+ private static final String REDUCER_ESTIMATOR_ARG_KEY = "pig.exec.reducer.estimator.arg";
/**
* We will serialize the POStore(s) present in map and reduce in lists in
@@ -744,111 +744,26 @@ public class JobControlCompiler{
}
/**
- * Estimate the number of reducers based on input size.
- * Number of reducers is based on two properties:
- * <ul>
- * <li>pig.exec.reducers.bytes.per.reducer -
- * how many bytes of input per reducer (default is 1000*1000*1000)</li>
- * <li>pig.exec.reducers.max -
- * constrain the maximum number of reducer task (default is 999)</li>
- * </ul>
- * If using a loader that implements LoadMetadata the reported input size is used, otherwise
- * attempt to determine size from the filesystem.
- * <p>
- * e.g. the following is your pig script
- * <pre>
- * a = load '/data/a';
- * b = load '/data/b';
- * c = join a by $0, b by $0;
- * store c into '/tmp';
- * </pre>
- * The size of /data/a is 1000*1000*1000, and size of /data/b is 2*1000*1000*1000.
- * Then the estimated reducer number is (1000*1000*1000+2*1000*1000*1000)/(1000*1000*1000)=3
- * </p>
- *
- * @param conf read settings from this configuration
- * @param lds inputs to estimate number of reducers for
- * @param job job configuration
- * @throws IOException on error
- * @return estimated number of reducers necessary for this input
+ * Looks up the estimator from REDUCER_ESTIMATOR_KEY and invokes it to find the number of
+ * reducers to use. If REDUCER_ESTIMATOR_KEY isn't set, defaults to InputSizeReducerEstimator
+ * @param conf
+ * @param lds
+ * @throws IOException
*/
public static int estimateNumberOfReducers(Configuration conf, List<POLoad> lds,
- org.apache.hadoop.mapreduce.Job job) throws IOException {
- long bytesPerReducer =
- conf.getLong("pig.exec.reducers.bytes.per.reducer", (1000 * 1000 * 1000));
- int maxReducers = conf.getInt("pig.exec.reducers.max", 999);
-
- long totalInputFileSize = getInputSize(conf, lds, job);
-
- log.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
- + maxReducers + " totalInputFileSize=" + totalInputFileSize);
-
- int reducers = (int)Math.ceil((totalInputFileSize+0.0) / bytesPerReducer);
- reducers = Math.max(1, reducers);
- reducers = Math.min(maxReducers, reducers);
- conf.setInt("mapred.reduce.tasks", reducers);
-
- log.info("Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to " + reducers);
- return reducers;
- }
-
- /**
- * Get the input size for as many inputs as possible. Inputs that do not report
- * their size nor can pig look that up itself are excluded from this size.
- */
- public static long getInputSize(Configuration conf, List<POLoad> lds,
- org.apache.hadoop.mapreduce.Job job) throws IOException {
- long totalInputFileSize = 0;
- for (POLoad ld : lds) {
- long size = getInputSizeFromLoader(ld, job);
- if (size > 0) {
- totalInputFileSize += size;
- continue;
- }
- // the input file location might be a list of comma separated files,
- // separate them out
- for (String location : LoadFunc.getPathStrings(ld.getLFile().getFileName())) {
- if (UriUtil.isHDFSFileOrLocalOrS3N(location)) {
- Path path = new Path(location);
- FileSystem fs = path.getFileSystem(conf);
- FileStatus[] status = fs.globStatus(path);
- if (status != null) {
- for (FileStatus s : status) {
- totalInputFileSize += Utils.getPathLength(fs, s);
- }
- }
- }
- }
- }
- return totalInputFileSize;
- }
-
- /**
- * Get the total input size in bytes by looking at statistics provided by
- * loaders that implement @{link LoadMetadata}.
- * @param ld
- * @param job
- * @return total input size in bytes, or 0 if unknown or incomplete
- * @throws IOException on error
- */
- public static long getInputSizeFromLoader(
- POLoad ld, org.apache.hadoop.mapreduce.Job job) throws IOException {
- if (ld.getLoadFunc() == null
- || !(ld.getLoadFunc() instanceof LoadMetadata)
- || ld.getLFile() == null
- || ld.getLFile().getFileName() == null) {
- return 0;
- }
-
- ResourceStatistics statistics =
- ((LoadMetadata) ld.getLoadFunc())
- .getStatistics(ld.getLFile().getFileName(), job);
-
- if (statistics == null || statistics.getSizeInBytes() == null) {
- return 0;
- }
-
- return statistics.getSizeInBytes();
+ org.apache.hadoop.mapreduce.Job job) throws IOException {
+ PigReducerEstimator estimator = conf.get(REDUCER_ESTIMATOR_KEY) == null ?
+ new InputSizeReducerEstimator() :
+ PigContext.instantiateObjectFromParams(conf,
+ REDUCER_ESTIMATOR_KEY, REDUCER_ESTIMATOR_ARG_KEY, PigReducerEstimator.class);
+
+ log.info("Using reducer estimator: " + estimator.getClass().getName());
+ int numberOfReducers = estimator.estimateNumberOfReducers(conf, lds, job);
+ conf.setInt("mapred.reduce.tasks", numberOfReducers);
+
+ log.info("Neither PARALLEL nor default parallelism is set for this job. Setting number of "
+ + "reducers to " + numberOfReducers);
+ return numberOfReducers;
}
public static class PigSecondaryKeyGroupComparator extends WritableComparator {
Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1326492&r1=1326491&r2=1326492&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Mon Apr 16 05:47:14 2012
@@ -38,6 +38,7 @@ import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Level;
import org.apache.pig.ExecType;
@@ -498,8 +499,45 @@ public class PigContext implements Seria
String msg = "Could not resolve " + name + " using imports: " + packageImportList.get();
throw new ExecException(msg, errCode, PigException.INPUT);
}
-
-
+
+ /**
+ * A common Pig pattern for initializing objects via system properties is to support passing
+ * something like this on the command line:
+ * <code>-Dpig.notification.listener=MyClass</code>
+ * <code>-Dpig.notification.listener.arg=myConstructorStringArg</code>
+ *
+ * This method will properly initialize the class with the args, if they exist.
+ * @param conf
+ * @param classParamKey the property used to identify the class
+ * @param argParamKey the property used to identify the class args
+ * @param clazz The class that is expected
+ * @return
+ */
+ public static <T> T instantiateObjectFromParams(Configuration conf,
+ String classParamKey,
+ String argParamKey,
+ Class<T> clazz) throws ExecException {
+ String className = conf.get(classParamKey);
+
+ if (className != null) {
+ FuncSpec fs;
+ if (conf.get(argParamKey) != null) {
+ fs = new FuncSpec(className, conf.get(argParamKey));
+ } else {
+ fs = new FuncSpec(className);
+ }
+ try {
+ return clazz.cast(PigContext.instantiateFuncFromSpec(fs));
+ }
+ catch (ClassCastException e) {
+ throw new ExecException("The class defined by " + classParamKey +
+ " in conf is not of type " + clazz.getName(), e);
+ }
+ } else {
+ return null;
+ }
+ }
+
@SuppressWarnings({ "unchecked", "rawtypes" })
public static Object instantiateFuncFromSpec(FuncSpec funcSpec) {
Object ret;
Modified: pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1326492&r1=1326491&r2=1326492&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java Mon Apr 16 05:47:14 2012
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.pig.test;
import java.io.File;
@@ -133,35 +150,7 @@ public class TestJobControlCompiler {
new org.apache.hadoop.mapreduce.Job(CONF)));
}
- @Test
- public void testGetInputSizeFromFs() throws Exception {
- long size = 2L * 1024 * 1024 * 1024;
- Assert.assertEquals(size, JobControlCompiler.getInputSize(
- CONF, Lists.newArrayList(createPOLoadWithSize(size, new PigStorage())),
- new org.apache.hadoop.mapreduce.Job(CONF)));
-
- Assert.assertEquals(size, JobControlCompiler.getInputSize(
- CONF,
- Lists.newArrayList(createPOLoadWithSize(size, new PigStorageWithStatistics())),
- new org.apache.hadoop.mapreduce.Job(CONF)));
-
- Assert.assertEquals(size * 2, JobControlCompiler.getInputSize(
- CONF,
- Lists.newArrayList(
- createPOLoadWithSize(size, new PigStorage()),
- createPOLoadWithSize(size, new PigStorageWithStatistics())),
- new org.apache.hadoop.mapreduce.Job(CONF)));
- }
-
- @Test
- public void testGetInputSizeFromLoader() throws Exception {
- long size = 2L * 1024 * 1024 * 1024;
- Assert.assertEquals(size, JobControlCompiler.getInputSizeFromLoader(
- createPOLoadWithSize(size, new PigStorageWithStatistics()),
- new org.apache.hadoop.mapreduce.Job(CONF)));
- }
-
- private static POLoad createPOLoadWithSize(long size, LoadFunc loadFunc) throws Exception {
+ public static POLoad createPOLoadWithSize(long size, LoadFunc loadFunc) throws Exception {
File file = File.createTempFile("tempFile", ".tmp");
file.deleteOnExit();
RandomAccessFile f = new RandomAccessFile(file, "rw");