You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ju...@apache.org on 2012/03/16 23:06:43 UTC

svn commit: r1301787 - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/impl/io/ src/org/apache/pig/impl/util/ test/org/apache/pig/test/

Author: julien
Date: Fri Mar 16 22:06:42 2012
New Revision: 1301787

URL: http://svn.apache.org/viewvc?rev=1301787&view=rev
Log:
PIG-2573: Automagically setting parallelism based on input file size does not work with HCatalog (traviscrawford via julien)

Added:
    pig/trunk/test/org/apache/pig/test/PigStorageWithStatistics.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/ResourceStatistics.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
    pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java
    pig/trunk/src/org/apache/pig/impl/util/Utils.java
    pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1301787&r1=1301786&r2=1301787&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Mar 16 22:06:42 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2573: Automagically setting parallelism based on input file size does not work with HCatalog (traviscrawford via julien)
+
 PIG-2541: Automatic record provenance (source tagging) for PigStorage (prkommireddi via daijy)
 
 PIG-2538: Add helper wrapper classes for StoreFunc (billgraham via dvryaboy)

Modified: pig/trunk/src/org/apache/pig/ResourceStatistics.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/ResourceStatistics.java?rev=1301787&r1=1301786&r2=1301787&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/ResourceStatistics.java (original)
+++ pig/trunk/src/org/apache/pig/ResourceStatistics.java Fri Mar 16 22:06:42 2012
@@ -200,6 +200,17 @@ public class ResourceStatistics implemen
         this.mBytes = mBytes;
         return this;
     }
+
+    /**
+     * @return getmBytes as bytes.
+     */
+    public Long getSizeInBytes() {
+        // Ideally size would be stored in bytes, and getmBytes would convert
+        // that number. However, mBytes is public so we cannot remove it, or
+        // guarantee it stays in sync with size in bytes.
+        return getmBytes() == null ? null : getmBytes() * 1024 * 1024;
+    }
+
     public Long getNumRecords() {
         return numRecords;
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1301787&r1=1301786&r2=1301787&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Mar 16 22:06:42 2012
@@ -49,7 +49,9 @@ import org.apache.hadoop.mapred.jobcontr
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
 import org.apache.pig.PigException;
+import org.apache.pig.ResourceStatistics;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
@@ -319,7 +321,7 @@ public class JobControlCompiler{
      * the reduce plan and serializes it so that the PigMapReduce class can use it to package
      * the indexed tuples received by the reducer.
      * @param mro - The MapReduceOper for which the JobConf is required
-     * @param conf - the Configuration object from which JobConf is built
+     * @param config - the Configuration object from which JobConf is built
      * @param pigContext - The PigContext passed on from execution engine
      * @return Job corresponding to mro
      * @throws JobCreationException
@@ -595,7 +597,7 @@ public class JobControlCompiler{
 		else if (pigContext.defaultParallel > 0)
                     conf.set("mapred.reduce.tasks", ""+pigContext.defaultParallel);
                 else
-                    estimateNumberOfReducers(conf,lds);
+                    estimateNumberOfReducers(conf, lds, nwJob);
                 
                 if (mro.customPartitioner != null)
                 	nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
@@ -737,28 +739,44 @@ public class JobControlCompiler{
             throw new JobCreationException(msg, errCode, PigException.BUG, e);
         }
     }
-    
+
     /**
-     * Currently the estimation of reducer number is only applied to HDFS, The estimation is based on the input size of data storage on HDFS.
-     * Two parameters can been configured for the estimation, one is pig.exec.reducers.max which constrain the maximum number of reducer task (default is 999). The other
-     * is pig.exec.reducers.bytes.per.reducer(default value is 1000*1000*1000) which means the how much data can been handled for each reducer.
+     * Estimate the number of reducers based on input size.
+     * Number of reducers is based on two properties:
+     * <ul>
+     *     <li>pig.exec.reducers.bytes.per.reducer -
+     *     how many bytes of input per reducer (default is 1000*1000*1000)</li>
+     *     <li>pig.exec.reducers.max -
+     *     constrain the maximum number of reducer task (default is 999)</li>
+     * </ul>
+     * If using a loader that implements LoadMetadata the reported input size is used, otherwise
+     * attempt to determine size from the filesystem.
+     * <p>
      * e.g. the following is your pig script
+     * <pre>
      * a = load '/data/a';
      * b = load '/data/b';
      * c = join a by $0, b by $0;
      * store c into '/tmp';
-     * 
+     * </pre>
      * The size of /data/a is 1000*1000*1000, and size of /data/b is 2*1000*1000*1000.
      * Then the estimated reducer number is (1000*1000*1000+2*1000*1000*1000)/(1000*1000*1000)=3
-     * @param conf
-     * @param lds
-     * @throws IOException
+     * </p>
+     *
+     * @param conf read settings from this configuration
+     * @param lds inputs to estimate number of reducers for
+     * @param job job configuration
+     * @throws IOException on error
+     * @return estimated number of reducers necessary for this input
      */
-    static int estimateNumberOfReducers(Configuration conf, List<POLoad> lds) throws IOException {
-           long bytesPerReducer = conf.getLong("pig.exec.reducers.bytes.per.reducer", (1000 * 1000 * 1000));
+    public static int estimateNumberOfReducers(Configuration conf, List<POLoad> lds,
+            org.apache.hadoop.mapreduce.Job job) throws IOException {
+        long bytesPerReducer =
+                conf.getLong("pig.exec.reducers.bytes.per.reducer", (1000 * 1000 * 1000));
         int maxReducers = conf.getInt("pig.exec.reducers.max", 999);
-        long totalInputFileSize = getTotalInputFileSize(conf, lds);
-       
+
+        long totalInputFileSize = getInputSize(conf, lds, job);
+
         log.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
             + maxReducers + " totalInputFileSize=" + totalInputFileSize);
         
@@ -771,56 +789,65 @@ public class JobControlCompiler{
         return reducers;
     }
 
-    private static long getTotalInputFileSize(Configuration conf, List<POLoad> lds) throws IOException {
-        List<String> inputs = new ArrayList<String>();
-        if(lds!=null && lds.size()>0){
-            for (POLoad ld : lds) {
-                inputs.add(ld.getLFile().getFileName());
-            }
-        }
-        long size = 0;
-        
-        for (String input : inputs){
-            //Using custom uri parsing because 'new Path(location).toUri()' fails
-            // for some valid uri's (eg jdbc style), and 'new Uri(location)' fails
-            // for valid hdfs paths that contain curly braces
-            if(!UriUtil.isHDFSFileOrLocalOrS3N(input)){
-                //skip  if it is not hdfs or local file or s3n
+    /**
+     * Get the input size for as many inputs as possible. Inputs that do not report
+     * their size nor can pig look that up itself are excluded from this size.
+     */
+    public static long getInputSize(Configuration conf, List<POLoad> lds,
+            org.apache.hadoop.mapreduce.Job job) throws IOException {
+        long totalInputFileSize = 0;
+        for (POLoad ld : lds) {
+            long size = getInputSizeFromLoader(ld, job);
+            if (size > 0) {
+                totalInputFileSize += size;
                 continue;
             }
-
-            //the input file location might be a list of comma separeated files, 
+            // the input file location might be a list of comma separated files,
             // separate them out
-            for(String location : LoadFunc.getPathStrings(input)){
-                if(! UriUtil.isHDFSFileOrLocalOrS3N(location)){
-                    continue;
-                }
-                Path path = new Path(location);
-                FileSystem fs = path.getFileSystem(conf);
-                FileStatus[] status=fs.globStatus(path);
-                if (status != null){
-                    for (FileStatus s : status){
-                        size += getPathLength(fs, s);
+            for (String location : LoadFunc.getPathStrings(ld.getLFile().getFileName())) {
+                if (UriUtil.isHDFSFileOrLocalOrS3N(location)) {
+                    Path path = new Path(location);
+                    FileSystem fs = path.getFileSystem(conf);
+                    FileStatus[] status = fs.globStatus(path);
+                    if (status != null) {
+                        for (FileStatus s : status) {
+                            totalInputFileSize += Utils.getPathLength(fs, s);
+                        }
                     }
                 }
             }
         }
-        return size;
-   }
-   
-    private static long getPathLength(FileSystem fs,FileStatus status) throws IOException{
-        if (!status.isDir()){
-            return status.getLen();
-        }else{
-            FileStatus[] children = fs.listStatus(status.getPath());
-            long size=0;
-            for (FileStatus child : children){
-                size +=getPathLength(fs, child);
-            }
-            return size;
+        return totalInputFileSize;
+    }
+
+    /**
+     * Get the total input size in bytes by looking at statistics provided by
+     * loaders that implement @{link LoadMetadata}.
+     * @param ld
+     * @param job
+     * @return total input size in bytes, or 0 if unknown or incomplete
+     * @throws IOException on error
+     */
+    public static long getInputSizeFromLoader(
+            POLoad ld, org.apache.hadoop.mapreduce.Job job) throws IOException {
+        if (ld.getLoadFunc() == null
+                || !(ld.getLoadFunc() instanceof LoadMetadata)
+                || ld.getLFile() == null
+                || ld.getLFile().getFileName() == null) {
+            return 0;
         }
+
+        ResourceStatistics statistics =
+                ((LoadMetadata) ld.getLoadFunc())
+                        .getStatistics(ld.getLFile().getFileName(), job);
+
+        if (statistics == null || statistics.getSizeInBytes() == null) {
+            return 0;
+        }
+
+        return statistics.getSizeInBytes();
     }
-        
+
     public static class PigSecondaryKeyGroupComparator extends WritableComparator {
         public PigSecondaryKeyGroupComparator() {
 //            super(TupleFactory.getInstance().tupleClass(), true);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=1301787&r1=1301786&r2=1301787&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java Fri Mar 16 22:06:42 2012
@@ -137,7 +137,8 @@ public class SampleOptimizer extends MRO
             Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
             int rp = 1;
             try {
-                rp = JobControlCompiler.estimateNumberOfReducers(conf, lds);
+                rp = JobControlCompiler.estimateNumberOfReducers(
+                        conf, lds, new org.apache.hadoop.mapreduce.Job(conf));
             } catch (IOException e) {
                 log.warn("Failed to estimate number of reducers", e);
             }

Modified: pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=1301787&r1=1301786&r2=1301787&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java Fri Mar 16 22:06:42 2012
@@ -30,8 +30,12 @@ import org.apache.hadoop.mapreduce.JobID
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.pig.Expression;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.data.Tuple;
@@ -49,7 +53,7 @@ import org.apache.pig.impl.plan.Operator
  * 1) construct an object using the constructor
  * 2) Call getNext() in a loop till it returns null
  */
-public class ReadToEndLoader extends LoadFunc {
+public class ReadToEndLoader extends LoadFunc implements LoadMetadata {
 
     /**
      * the wrapped LoadFunc which will do the actual reading
@@ -58,7 +62,7 @@ public class ReadToEndLoader extends Loa
     
     /**
      * the Configuration object used to locate the input location - this will
-     * be used to call {@link LoadFunc#setLocation(String, Configuration)} on 
+     * be used to call {@link LoadFunc#setLocation(String, Job)} on
      * the wrappedLoadFunc
      */
     private Configuration conf;
@@ -257,6 +261,37 @@ public class ReadToEndLoader extends Loa
         //no-op
     }
 
+    @Override
+    public ResourceSchema getSchema(String location, Job job) throws IOException {
+        if (wrappedLoadFunc instanceof LoadMetadata) {
+            return ((LoadMetadata) wrappedLoadFunc).getSchema(location, job);
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public ResourceStatistics getStatistics(String location, Job job) throws IOException {
+        if (wrappedLoadFunc instanceof LoadMetadata) {
+            return ((LoadMetadata) wrappedLoadFunc).getStatistics(location, job);
+        } else {
+            return null;
+        }
+    }
 
-   
+    @Override
+    public String[] getPartitionKeys(String location, Job job) throws IOException {
+        if (wrappedLoadFunc instanceof LoadMetadata) {
+            return ((LoadMetadata) wrappedLoadFunc).getPartitionKeys(location, job);
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public void setPartitionFilter(Expression partitionFilter) throws IOException {
+        if (wrappedLoadFunc instanceof LoadMetadata) {
+             ((LoadMetadata) wrappedLoadFunc).setPartitionFilter(partitionFilter);
+        }
+    }
 }

Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1301787&r1=1301786&r2=1301787&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Fri Mar 16 22:06:42 2012
@@ -32,6 +32,7 @@ import java.util.Properties;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
@@ -309,6 +310,21 @@ public class Utils {
     		return in;
     	}
     }
-    
+
+    /**
+     * Returns the total number of bytes for this file, or if a file all files in the directory.
+     */
+    public static long getPathLength(FileSystem fs, FileStatus status) throws IOException {
+        if (!status.isDir()) {
+            return status.getLen();
+        } else {
+            FileStatus[] children = fs.listStatus(status.getPath());
+            long size = 0;
+            for (FileStatus child : children) {
+                size += getPathLength(fs, child);
+            }
+            return size;
+        }
+    }
 
 }

Added: pig/trunk/test/org/apache/pig/test/PigStorageWithStatistics.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/PigStorageWithStatistics.java?rev=1301787&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/PigStorageWithStatistics.java (added)
+++ pig/trunk/test/org/apache/pig/test/PigStorageWithStatistics.java Fri Mar 16 22:06:42 2012
@@ -0,0 +1,50 @@
+package org.apache.pig.test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.util.UriUtil;
+import org.apache.pig.impl.util.Utils;
+
+import java.io.IOException;
+
+public class PigStorageWithStatistics extends PigStorage {
+    private String loc = null;
+
+    @Override
+    public void setLocation(String location, Job job)
+            throws IOException {
+        super.setLocation(location, job);
+        loc = location;
+    }
+
+    @Override
+    public ResourceStatistics getStatistics(String location, Job job) throws IOException {
+        ResourceStatistics stats = new ResourceStatistics();
+        stats.setmBytes(getInputmBytes());
+        return stats;
+    }
+
+    private Long getInputmBytes() throws IOException {
+        if (loc == null) {
+            return 0L;
+        }
+
+        long inputBytes = 0L;
+        for (String location : getPathStrings(loc)) {
+            Path path = new Path(location);
+            FileSystem fs = path.getFileSystem(new Configuration());
+            FileStatus[] status = fs.globStatus(path);
+            if (status != null) {
+                for (FileStatus s : status) {
+                    inputBytes += Utils.getPathLength(fs, s);
+                }
+            }
+        }
+        return inputBytes / 1024 / 1024;
+    }
+}

Modified: pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1301787&r1=1301786&r2=1301787&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java Fri Mar 16 22:06:42 2012
@@ -7,6 +7,7 @@ import java.io.FileOutputStream;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.RandomAccessFile;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.Arrays;
@@ -24,22 +25,30 @@ import javax.tools.JavaFileObject;
 import javax.tools.StandardJavaFileManager;
 import javax.tools.ToolProvider;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.LoadFunc;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class TestJobControlCompiler {
 
+    private static final Configuration CONF = new Configuration();
+
   /**
    * specifically tests that REGISTERED jars get added to distributed cache instead of merged into 
    * the job jar
@@ -77,8 +86,7 @@ public class TestJobControlCompiler {
     PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new Properties());
     pigContext.connect();
     pigContext.addJar(tmpFile.getAbsolutePath());
-    Configuration conf = new Configuration();
-    JobControlCompiler jobControlCompiler = new JobControlCompiler(pigContext, conf);
+    JobControlCompiler jobControlCompiler = new JobControlCompiler(pigContext, CONF);
     MROperPlan plan = new MROperPlan();
     MapReduceOper mro = new MapReduceOper(new OperatorKey());
     mro.UDFs = new HashSet<String>();
@@ -107,6 +115,68 @@ public class TestJobControlCompiler {
 
   }
 
+    @Test
+    public void testEstimateNumberOfReducers() throws Exception {
+        Assert.assertEquals(2, JobControlCompiler.estimateNumberOfReducers(CONF,
+                Lists.newArrayList(createPOLoadWithSize(2L * 1000 * 1000 * 999,
+                        new PigStorage())),
+                new org.apache.hadoop.mapreduce.Job(CONF)));
+
+        Assert.assertEquals(2, JobControlCompiler.estimateNumberOfReducers(CONF,
+                Lists.newArrayList(createPOLoadWithSize(2L * 1000 * 1000 * 1000,
+                        new PigStorage())),
+                new org.apache.hadoop.mapreduce.Job(CONF)));
+
+        Assert.assertEquals(3, JobControlCompiler.estimateNumberOfReducers(CONF,
+                Lists.newArrayList(createPOLoadWithSize(2L * 1000 * 1000 * 1001,
+                        new PigStorage())),
+                new org.apache.hadoop.mapreduce.Job(CONF)));
+    }
+
+    @Test
+    public void testGetInputSizeFromFs() throws Exception {
+        long size = 2L * 1024 * 1024 * 1024;
+        Assert.assertEquals(size, JobControlCompiler.getInputSize(
+                CONF, Lists.newArrayList(createPOLoadWithSize(size, new PigStorage())),
+                new org.apache.hadoop.mapreduce.Job(CONF)));
+
+        Assert.assertEquals(size, JobControlCompiler.getInputSize(
+                CONF,
+                Lists.newArrayList(createPOLoadWithSize(size, new PigStorageWithStatistics())),
+                new org.apache.hadoop.mapreduce.Job(CONF)));
+
+        Assert.assertEquals(size * 2, JobControlCompiler.getInputSize(
+                CONF,
+                Lists.newArrayList(
+                        createPOLoadWithSize(size, new PigStorage()),
+                        createPOLoadWithSize(size, new PigStorageWithStatistics())),
+                new org.apache.hadoop.mapreduce.Job(CONF)));
+    }
+
+    @Test
+    public void testGetInputSizeFromLoader() throws Exception {
+        long size = 2L * 1024 * 1024 * 1024;
+        Assert.assertEquals(size, JobControlCompiler.getInputSizeFromLoader(
+                createPOLoadWithSize(size, new PigStorageWithStatistics()),
+                new org.apache.hadoop.mapreduce.Job(CONF)));
+    }
+
+    private static POLoad createPOLoadWithSize(long size, LoadFunc loadFunc) throws Exception {
+        File file = File.createTempFile("tempFile", ".tmp");
+        file.deleteOnExit();
+        RandomAccessFile f = new RandomAccessFile(file, "rw");
+        f.setLength(size);
+
+        loadFunc.setLocation(file.getAbsolutePath(), new org.apache.hadoop.mapreduce.Job(CONF));
+        FuncSpec funcSpec = new FuncSpec(loadFunc.getClass().getCanonicalName());
+        POLoad poLoad = new POLoad(new OperatorKey(), loadFunc);
+        poLoad.setLFile(new FileSpec(file.getAbsolutePath(), funcSpec));
+        poLoad.setPc(new PigContext());
+        poLoad.setUp();
+
+        return poLoad;
+    }
+
   /**
    * checks if the given file name is in the jar 
    * @param jarFile the jar to check