You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by bi...@apache.org on 2012/04/17 03:29:45 UTC
svn commit: r1326867 - in
/pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer:
InputSizeReducerEstimator.java PigReducerEstimator.java
Author: billgraham
Date: Tue Apr 17 01:29:45 2012
New Revision: 1326867
URL: http://svn.apache.org/viewvc?rev=1326867&view=rev
Log:
PIG-2574 Make reducer estimator plugable (missed new files)
Added:
pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java
Added: pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1326867&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java Tue Apr 17 01:29:45 2012
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.impl.util.UriUtil;
+import org.apache.pig.impl.util.Utils;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Class that estimates the number of reducers based on input size.
+ * Number of reducers is based on two properties:
+ * <ul>
+ * <li>pig.exec.reducers.bytes.per.reducer -
+ * how many bytes of input per reducer (default is 1000*1000*1000)</li>
+ * <li>pig.exec.reducers.max -
+ * constrain the maximum number of reducer task (default is 999)</li>
+ * </ul>
+ * If using a loader that implements LoadMetadata the reported input size is used, otherwise
+ * attempt to determine size from the filesystem.
+ * <p>
+ * e.g. the following is your pig script
+ * <pre>
+ * a = load '/data/a';
+ * b = load '/data/b';
+ * c = join a by $0, b by $0;
+ * store c into '/tmp';
+ * </pre>
+ * and the size of /data/a is 1000*1000*1000, and the size of /data/b is
+ * 2*1000*1000*1000 then the estimated number of reducer to use will be
+ * (1000*1000*1000+2*1000*1000*1000)/(1000*1000*1000)=3
+ *
+ */
+public class InputSizeReducerEstimator implements PigReducerEstimator {
+ private static final Log log = LogFactory.getLog(InputSizeReducerEstimator.class);
+
+ /**
+ * Determines the number of reducers to be used.
+ *
+ * @param conf the job configuration
+ * @param lds list of POLoads used in the jobs physical plan
+ * @param job job instance
+ * @throws java.io.IOException
+ */
+ @Override
+ public int estimateNumberOfReducers(Configuration conf, List<POLoad> lds, Job job) throws IOException {
+ long bytesPerReducer = conf.getLong(BYTES_PER_REDUCER_PARAM, DEFAULT_BYTES_PER_REDUCER);
+ int maxReducers = conf.getInt(MAX_REDUCER_COUNT_PARAM, DEFAULT_MAX_REDUCER_COUNT_PARAM);
+ long totalInputFileSize = getTotalInputFileSize(conf, lds, job);
+
+ log.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+ + maxReducers + " totalInputFileSize=" + totalInputFileSize);
+
+ int reducers = (int)Math.ceil((double)totalInputFileSize / bytesPerReducer);
+ reducers = Math.max(1, reducers);
+ reducers = Math.min(maxReducers, reducers);
+
+ return reducers;
+ }
+
+ /**
+ * Get the input size for as many inputs as possible. Inputs that do not report
+ * their size nor can pig look that up itself are excluded from this size.
+ */
+ static long getTotalInputFileSize(Configuration conf,
+ List<POLoad> lds, Job job) throws IOException {
+ long totalInputFileSize = 0;
+ for (POLoad ld : lds) {
+ long size = getInputSizeFromLoader(ld, job);
+ if (size > 0) {
+ totalInputFileSize += size;
+ continue;
+ }
+ // the input file location might be a list of comma separated files,
+ // separate them out
+ for (String location : LoadFunc.getPathStrings(ld.getLFile().getFileName())) {
+ if (UriUtil.isHDFSFileOrLocalOrS3N(location)) {
+ Path path = new Path(location);
+ FileSystem fs = path.getFileSystem(conf);
+ FileStatus[] status = fs.globStatus(path);
+ if (status != null) {
+ for (FileStatus s : status) {
+ totalInputFileSize += Utils.getPathLength(fs, s);
+ }
+ }
+ }
+ }
+ }
+ return totalInputFileSize;
+ }
+
+ /**
+ * Get the total input size in bytes by looking at statistics provided by
+ * loaders that implement @{link LoadMetadata}.
+ * @param ld
+ * @param job
+ * @return total input size in bytes, or 0 if unknown or incomplete
+ * @throws IOException on error
+ */
+ static long getInputSizeFromLoader(POLoad ld, Job job) throws IOException {
+ if (ld.getLoadFunc() == null
+ || !(ld.getLoadFunc() instanceof LoadMetadata)
+ || ld.getLFile() == null
+ || ld.getLFile().getFileName() == null) {
+ return 0;
+ }
+
+ ResourceStatistics statistics;
+ try {
+ statistics = ((LoadMetadata) ld.getLoadFunc())
+ .getStatistics(ld.getLFile().getFileName(), job);
+ } catch (Exception e) {
+ log.warn("Couldn't get statistics from LoadFunc: " + ld.getLoadFunc(), e);
+ return 0;
+ }
+
+ if (statistics == null || statistics.getSizeInBytes() == null) {
+ return 0;
+ }
+
+ return statistics.getSizeInBytes();
+ }
+}
Added: pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java?rev=1326867&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java Tue Apr 17 01:29:45 2012
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Interface to implement when you want to use a custom approach to estimating
+ * the number of reducers for a job.
+ *
+ * @see InputSizeReducerEstimator
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface PigReducerEstimator {
+
+ static final String BYTES_PER_REDUCER_PARAM = "pig.exec.reducers.bytes.per.reducer";
+ static final String MAX_REDUCER_COUNT_PARAM = "pig.exec.reducers.max";
+
+ static final long DEFAULT_BYTES_PER_REDUCER = 1000 * 1000 * 1000;
+ static final int DEFAULT_MAX_REDUCER_COUNT_PARAM = 999;
+
+ /**
+ * Estimate the number of reducers for a given job based on the collection
+ * of load funcs passed.
+ *
+ * @param conf the job configuration
+ * @param poLoadList list of POLoads used in the jobs physical plan
+ * @param job job instance
+ * @return the number of reducers to use
+ * @throws IOException
+ */
+ public int estimateNumberOfReducers(Configuration conf, List<POLoad> poLoadList, Job job)
+ throws IOException;
+}