You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by bi...@apache.org on 2012/04/17 03:34:13 UTC

svn commit: r1326872 - in /pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer: InputSizeReducerEstimator.java PigReducerEstimator.java

Author: billgraham
Date: Tue Apr 17 01:34:13 2012
New Revision: 1326872

URL: http://svn.apache.org/viewvc?rev=1326872&view=rev
Log:
PIG-2574 Make reducer estimator plugable (missed new files)

Added:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1326872&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java Tue Apr 17 01:34:13 2012
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.impl.util.UriUtil;
+import org.apache.pig.impl.util.Utils;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Class that estimates the number of reducers based on input size.
+ * Number of reducers is based on two properties:
+ * <ul>
+ *     <li>pig.exec.reducers.bytes.per.reducer -
+ *     how many bytes of input per reducer (default is 1000*1000*1000)</li>
+ *     <li>pig.exec.reducers.max -
+ *     constrain the maximum number of reducer task (default is 999)</li>
+ * </ul>
+ * If using a loader that implements LoadMetadata the reported input size is used, otherwise
+ * attempt to determine size from the filesystem.
+ * <p>
+ * e.g. the following is your pig script
+ * <pre>
+ * a = load '/data/a';
+ * b = load '/data/b';
+ * c = join a by $0, b by $0;
+ * store c into '/tmp';
+ * </pre>
+ * and the size of /data/a is 1000*1000*1000, and the size of /data/b is
+ * 2*1000*1000*1000 then the estimated number of reducer to use will be
+ * (1000*1000*1000+2*1000*1000*1000)/(1000*1000*1000)=3
+ *
+ */
+public class InputSizeReducerEstimator implements PigReducerEstimator {
+    private static final Log log = LogFactory.getLog(InputSizeReducerEstimator.class);
+
+    /**
+     * Determines the number of reducers to be used.
+     *
+     * @param conf the job configuration
+     * @param lds list of POLoads used in the jobs physical plan
+     * @param job job instance
+     * @throws java.io.IOException
+     */
+    @Override
+    public int estimateNumberOfReducers(Configuration conf, List<POLoad> lds, Job job) throws IOException {
+        long bytesPerReducer = conf.getLong(BYTES_PER_REDUCER_PARAM, DEFAULT_BYTES_PER_REDUCER);
+        int maxReducers = conf.getInt(MAX_REDUCER_COUNT_PARAM, DEFAULT_MAX_REDUCER_COUNT_PARAM);
+        long totalInputFileSize = getTotalInputFileSize(conf, lds, job);
+
+        log.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+            + maxReducers + " totalInputFileSize=" + totalInputFileSize);
+
+        int reducers = (int)Math.ceil((double)totalInputFileSize / bytesPerReducer);
+        reducers = Math.max(1, reducers);
+        reducers = Math.min(maxReducers, reducers);
+
+        return reducers;
+    }
+
+    /**
+     * Get the input size for as many inputs as possible. Inputs that do not report
+     * their size nor can pig look that up itself are excluded from this size.
+     */
+    static long getTotalInputFileSize(Configuration conf,
+                                      List<POLoad> lds, Job job) throws IOException {
+        long totalInputFileSize = 0;
+        for (POLoad ld : lds) {
+            long size = getInputSizeFromLoader(ld, job);
+            if (size > 0) {
+                totalInputFileSize += size;
+                continue;
+            }
+            // the input file location might be a list of comma separated files,
+            // separate them out
+            for (String location : LoadFunc.getPathStrings(ld.getLFile().getFileName())) {
+                if (UriUtil.isHDFSFileOrLocalOrS3N(location)) {
+                    Path path = new Path(location);
+                    FileSystem fs = path.getFileSystem(conf);
+                    FileStatus[] status = fs.globStatus(path);
+                    if (status != null) {
+                        for (FileStatus s : status) {
+                            totalInputFileSize += Utils.getPathLength(fs, s);
+                        }
+                    }
+                }
+            }
+        }
+        return totalInputFileSize;
+    }
+
+    /**
+     * Get the total input size in bytes by looking at statistics provided by
+     * loaders that implement @{link LoadMetadata}.
+     * @param ld
+     * @param job
+     * @return total input size in bytes, or 0 if unknown or incomplete
+     * @throws IOException on error
+     */
+    static long getInputSizeFromLoader(POLoad ld, Job job) throws IOException {
+        if (ld.getLoadFunc() == null
+                || !(ld.getLoadFunc() instanceof LoadMetadata)
+                || ld.getLFile() == null
+                || ld.getLFile().getFileName() == null) {
+            return 0;
+        }
+
+        ResourceStatistics statistics;
+        try {
+            statistics = ((LoadMetadata) ld.getLoadFunc())
+                        .getStatistics(ld.getLFile().getFileName(), job);
+        } catch (Exception e) {
+            log.warn("Couldn't get statistics from LoadFunc: " + ld.getLoadFunc(), e);
+            return 0;
+        }
+
+        if (statistics == null || statistics.getSizeInBytes() == null) {
+            return 0;
+        }
+
+        return statistics.getSizeInBytes();
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java?rev=1326872&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java Tue Apr 17 01:34:13 2012
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Interface to implement when you want to use a custom approach to estimating
+ * the number of reducers for a job.
+ *
+ * @see InputSizeReducerEstimator
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface PigReducerEstimator {
+
+    static final String BYTES_PER_REDUCER_PARAM = "pig.exec.reducers.bytes.per.reducer";
+    static final String MAX_REDUCER_COUNT_PARAM = "pig.exec.reducers.max";
+
+    static final long DEFAULT_BYTES_PER_REDUCER = 1000 * 1000 * 1000;
+    static final int DEFAULT_MAX_REDUCER_COUNT_PARAM = 999;
+
+    /**
+     * Estimate the number of reducers for a given job based on the collection
+     * of load funcs passed.
+     *
+     * @param conf the job configuration
+     * @param poLoadList list of POLoads used in the jobs physical plan
+     * @param job job instance
+     * @return the number of reducers to use
+     * @throws IOException
+     */
+    public int estimateNumberOfReducers(Configuration conf, List<POLoad> poLoadList, Job job)
+        throws IOException;
+}