You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ju...@apache.org on 2012/03/16 23:06:43 UTC
svn commit: r1301787 - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/impl/io/ src/org/apache/pig/impl/util/
test/org/apache/pig/test/
Author: julien
Date: Fri Mar 16 22:06:42 2012
New Revision: 1301787
URL: http://svn.apache.org/viewvc?rev=1301787&view=rev
Log:
PIG-2573: Automagically setting parallelism based on input file size does not work with HCatalog (traviscrawford via julien)
Added:
pig/trunk/test/org/apache/pig/test/PigStorageWithStatistics.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/ResourceStatistics.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java
pig/trunk/src/org/apache/pig/impl/util/Utils.java
pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1301787&r1=1301786&r2=1301787&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Mar 16 22:06:42 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2573: Automagically setting parallelism based on input file size does not work with HCatalog (traviscrawford via julien)
+
PIG-2541: Automatic record provenance (source tagging) for PigStorage (prkommireddi via daijy)
PIG-2538: Add helper wrapper classes for StoreFunc (billgraham via dvryaboy)
Modified: pig/trunk/src/org/apache/pig/ResourceStatistics.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/ResourceStatistics.java?rev=1301787&r1=1301786&r2=1301787&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/ResourceStatistics.java (original)
+++ pig/trunk/src/org/apache/pig/ResourceStatistics.java Fri Mar 16 22:06:42 2012
@@ -200,6 +200,17 @@ public class ResourceStatistics implemen
this.mBytes = mBytes;
return this;
}
+
+ /**
+ * @return getmBytes as bytes.
+ */
+ public Long getSizeInBytes() {
+ // Ideally size would be stored in bytes, and getmBytes would convert
+ // that number. However, mBytes is public so we cannot remove it, or
+ // guarantee it stays in sync with size in bytes.
+ return getmBytes() == null ? null : getmBytes() * 1024 * 1024;
+ }
+
public Long getNumRecords() {
return numRecords;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1301787&r1=1301786&r2=1301787&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Mar 16 22:06:42 2012
@@ -49,7 +49,9 @@ import org.apache.hadoop.mapred.jobcontr
import org.apache.pig.ComparisonFunc;
import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
import org.apache.pig.PigException;
+import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
@@ -319,7 +321,7 @@ public class JobControlCompiler{
* the reduce plan and serializes it so that the PigMapReduce class can use it to package
* the indexed tuples received by the reducer.
* @param mro - The MapReduceOper for which the JobConf is required
- * @param conf - the Configuration object from which JobConf is built
+ * @param config - the Configuration object from which JobConf is built
* @param pigContext - The PigContext passed on from execution engine
* @return Job corresponding to mro
* @throws JobCreationException
@@ -595,7 +597,7 @@ public class JobControlCompiler{
else if (pigContext.defaultParallel > 0)
conf.set("mapred.reduce.tasks", ""+pigContext.defaultParallel);
else
- estimateNumberOfReducers(conf,lds);
+ estimateNumberOfReducers(conf, lds, nwJob);
if (mro.customPartitioner != null)
nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
@@ -737,28 +739,44 @@ public class JobControlCompiler{
throw new JobCreationException(msg, errCode, PigException.BUG, e);
}
}
-
+
/**
- * Currently the estimation of reducer number is only applied to HDFS, The estimation is based on the input size of data storage on HDFS.
- * Two parameters can been configured for the estimation, one is pig.exec.reducers.max which constrain the maximum number of reducer task (default is 999). The other
- * is pig.exec.reducers.bytes.per.reducer(default value is 1000*1000*1000) which means the how much data can been handled for each reducer.
+ * Estimate the number of reducers based on input size.
+ * Number of reducers is based on two properties:
+ * <ul>
+ * <li>pig.exec.reducers.bytes.per.reducer -
+ * how many bytes of input per reducer (default is 1000*1000*1000)</li>
+ * <li>pig.exec.reducers.max -
+ * constrain the maximum number of reducer task (default is 999)</li>
+ * </ul>
+ * If using a loader that implements LoadMetadata the reported input size is used, otherwise
+ * attempt to determine size from the filesystem.
+ * <p>
* e.g. the following is your pig script
+ * <pre>
* a = load '/data/a';
* b = load '/data/b';
* c = join a by $0, b by $0;
* store c into '/tmp';
- *
+ * </pre>
* The size of /data/a is 1000*1000*1000, and size of /data/b is 2*1000*1000*1000.
* Then the estimated reducer number is (1000*1000*1000+2*1000*1000*1000)/(1000*1000*1000)=3
- * @param conf
- * @param lds
- * @throws IOException
+ * </p>
+ *
+ * @param conf read settings from this configuration
+ * @param lds inputs to estimate number of reducers for
+ * @param job job configuration
+ * @throws IOException on error
+ * @return estimated number of reducers necessary for this input
*/
- static int estimateNumberOfReducers(Configuration conf, List<POLoad> lds) throws IOException {
- long bytesPerReducer = conf.getLong("pig.exec.reducers.bytes.per.reducer", (1000 * 1000 * 1000));
+ public static int estimateNumberOfReducers(Configuration conf, List<POLoad> lds,
+ org.apache.hadoop.mapreduce.Job job) throws IOException {
+ long bytesPerReducer =
+ conf.getLong("pig.exec.reducers.bytes.per.reducer", (1000 * 1000 * 1000));
int maxReducers = conf.getInt("pig.exec.reducers.max", 999);
- long totalInputFileSize = getTotalInputFileSize(conf, lds);
-
+
+ long totalInputFileSize = getInputSize(conf, lds, job);
+
log.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+ maxReducers + " totalInputFileSize=" + totalInputFileSize);
@@ -771,56 +789,65 @@ public class JobControlCompiler{
return reducers;
}
- private static long getTotalInputFileSize(Configuration conf, List<POLoad> lds) throws IOException {
- List<String> inputs = new ArrayList<String>();
- if(lds!=null && lds.size()>0){
- for (POLoad ld : lds) {
- inputs.add(ld.getLFile().getFileName());
- }
- }
- long size = 0;
-
- for (String input : inputs){
- //Using custom uri parsing because 'new Path(location).toUri()' fails
- // for some valid uri's (eg jdbc style), and 'new Uri(location)' fails
- // for valid hdfs paths that contain curly braces
- if(!UriUtil.isHDFSFileOrLocalOrS3N(input)){
- //skip if it is not hdfs or local file or s3n
+ /**
+ * Get the input size for as many inputs as possible. Inputs that do not report
+ * their size nor can pig look that up itself are excluded from this size.
+ */
+ public static long getInputSize(Configuration conf, List<POLoad> lds,
+ org.apache.hadoop.mapreduce.Job job) throws IOException {
+ long totalInputFileSize = 0;
+ for (POLoad ld : lds) {
+ long size = getInputSizeFromLoader(ld, job);
+ if (size > 0) {
+ totalInputFileSize += size;
continue;
}
-
- //the input file location might be a list of comma separeated files,
+ // the input file location might be a list of comma separated files,
// separate them out
- for(String location : LoadFunc.getPathStrings(input)){
- if(! UriUtil.isHDFSFileOrLocalOrS3N(location)){
- continue;
- }
- Path path = new Path(location);
- FileSystem fs = path.getFileSystem(conf);
- FileStatus[] status=fs.globStatus(path);
- if (status != null){
- for (FileStatus s : status){
- size += getPathLength(fs, s);
+ for (String location : LoadFunc.getPathStrings(ld.getLFile().getFileName())) {
+ if (UriUtil.isHDFSFileOrLocalOrS3N(location)) {
+ Path path = new Path(location);
+ FileSystem fs = path.getFileSystem(conf);
+ FileStatus[] status = fs.globStatus(path);
+ if (status != null) {
+ for (FileStatus s : status) {
+ totalInputFileSize += Utils.getPathLength(fs, s);
+ }
}
}
}
}
- return size;
- }
-
- private static long getPathLength(FileSystem fs,FileStatus status) throws IOException{
- if (!status.isDir()){
- return status.getLen();
- }else{
- FileStatus[] children = fs.listStatus(status.getPath());
- long size=0;
- for (FileStatus child : children){
- size +=getPathLength(fs, child);
- }
- return size;
+ return totalInputFileSize;
+ }
+
+ /**
+ * Get the total input size in bytes by looking at statistics provided by
+ * loaders that implement @{link LoadMetadata}.
+ * @param ld
+ * @param job
+ * @return total input size in bytes, or 0 if unknown or incomplete
+ * @throws IOException on error
+ */
+ public static long getInputSizeFromLoader(
+ POLoad ld, org.apache.hadoop.mapreduce.Job job) throws IOException {
+ if (ld.getLoadFunc() == null
+ || !(ld.getLoadFunc() instanceof LoadMetadata)
+ || ld.getLFile() == null
+ || ld.getLFile().getFileName() == null) {
+ return 0;
}
+
+ ResourceStatistics statistics =
+ ((LoadMetadata) ld.getLoadFunc())
+ .getStatistics(ld.getLFile().getFileName(), job);
+
+ if (statistics == null || statistics.getSizeInBytes() == null) {
+ return 0;
+ }
+
+ return statistics.getSizeInBytes();
}
-
+
public static class PigSecondaryKeyGroupComparator extends WritableComparator {
public PigSecondaryKeyGroupComparator() {
// super(TupleFactory.getInstance().tupleClass(), true);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=1301787&r1=1301786&r2=1301787&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java Fri Mar 16 22:06:42 2012
@@ -137,7 +137,8 @@ public class SampleOptimizer extends MRO
Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
int rp = 1;
try {
- rp = JobControlCompiler.estimateNumberOfReducers(conf, lds);
+ rp = JobControlCompiler.estimateNumberOfReducers(
+ conf, lds, new org.apache.hadoop.mapreduce.Job(conf));
} catch (IOException e) {
log.warn("Failed to estimate number of reducers", e);
}
Modified: pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=1301787&r1=1301786&r2=1301787&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java Fri Mar 16 22:06:42 2012
@@ -30,8 +30,12 @@ import org.apache.hadoop.mapreduce.JobID
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.pig.Expression;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.data.Tuple;
@@ -49,7 +53,7 @@ import org.apache.pig.impl.plan.Operator
* 1) construct an object using the constructor
* 2) Call getNext() in a loop till it returns null
*/
-public class ReadToEndLoader extends LoadFunc {
+public class ReadToEndLoader extends LoadFunc implements LoadMetadata {
/**
* the wrapped LoadFunc which will do the actual reading
@@ -58,7 +62,7 @@ public class ReadToEndLoader extends Loa
/**
* the Configuration object used to locate the input location - this will
- * be used to call {@link LoadFunc#setLocation(String, Configuration)} on
+ * be used to call {@link LoadFunc#setLocation(String, Job)} on
* the wrappedLoadFunc
*/
private Configuration conf;
@@ -257,6 +261,37 @@ public class ReadToEndLoader extends Loa
//no-op
}
+ @Override
+ public ResourceSchema getSchema(String location, Job job) throws IOException {
+ if (wrappedLoadFunc instanceof LoadMetadata) {
+ return ((LoadMetadata) wrappedLoadFunc).getSchema(location, job);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public ResourceStatistics getStatistics(String location, Job job) throws IOException {
+ if (wrappedLoadFunc instanceof LoadMetadata) {
+ return ((LoadMetadata) wrappedLoadFunc).getStatistics(location, job);
+ } else {
+ return null;
+ }
+ }
-
+ @Override
+ public String[] getPartitionKeys(String location, Job job) throws IOException {
+ if (wrappedLoadFunc instanceof LoadMetadata) {
+ return ((LoadMetadata) wrappedLoadFunc).getPartitionKeys(location, job);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void setPartitionFilter(Expression partitionFilter) throws IOException {
+ if (wrappedLoadFunc instanceof LoadMetadata) {
+ ((LoadMetadata) wrappedLoadFunc).setPartitionFilter(partitionFilter);
+ }
+ }
}
Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1301787&r1=1301786&r2=1301787&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Fri Mar 16 22:06:42 2012
@@ -32,6 +32,7 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
@@ -309,6 +310,21 @@ public class Utils {
return in;
}
}
-
+
+ /**
+ * Returns the total number of bytes for this file, or if a file all files in the directory.
+ */
+ public static long getPathLength(FileSystem fs, FileStatus status) throws IOException {
+ if (!status.isDir()) {
+ return status.getLen();
+ } else {
+ FileStatus[] children = fs.listStatus(status.getPath());
+ long size = 0;
+ for (FileStatus child : children) {
+ size += getPathLength(fs, child);
+ }
+ return size;
+ }
+ }
}
Added: pig/trunk/test/org/apache/pig/test/PigStorageWithStatistics.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/PigStorageWithStatistics.java?rev=1301787&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/PigStorageWithStatistics.java (added)
+++ pig/trunk/test/org/apache/pig/test/PigStorageWithStatistics.java Fri Mar 16 22:06:42 2012
@@ -0,0 +1,50 @@
+package org.apache.pig.test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.util.UriUtil;
+import org.apache.pig.impl.util.Utils;
+
+import java.io.IOException;
+
+public class PigStorageWithStatistics extends PigStorage {
+ private String loc = null;
+
+ @Override
+ public void setLocation(String location, Job job)
+ throws IOException {
+ super.setLocation(location, job);
+ loc = location;
+ }
+
+ @Override
+ public ResourceStatistics getStatistics(String location, Job job) throws IOException {
+ ResourceStatistics stats = new ResourceStatistics();
+ stats.setmBytes(getInputmBytes());
+ return stats;
+ }
+
+ private Long getInputmBytes() throws IOException {
+ if (loc == null) {
+ return 0L;
+ }
+
+ long inputBytes = 0L;
+ for (String location : getPathStrings(loc)) {
+ Path path = new Path(location);
+ FileSystem fs = path.getFileSystem(new Configuration());
+ FileStatus[] status = fs.globStatus(path);
+ if (status != null) {
+ for (FileStatus s : status) {
+ inputBytes += Utils.getPathLength(fs, s);
+ }
+ }
+ }
+ return inputBytes / 1024 / 1024;
+ }
+}
Modified: pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1301787&r1=1301786&r2=1301787&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java Fri Mar 16 22:06:42 2012
@@ -7,6 +7,7 @@ import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
+import java.io.RandomAccessFile;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
@@ -24,22 +25,30 @@ import javax.tools.JavaFileObject;
import javax.tools.StandardJavaFileManager;
import javax.tools.ToolProvider;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.builtin.PigStorage;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.junit.Assert;
import org.junit.Test;
public class TestJobControlCompiler {
+ private static final Configuration CONF = new Configuration();
+
/**
* specifically tests that REGISTERED jars get added to distributed cache instead of merged into
* the job jar
@@ -77,8 +86,7 @@ public class TestJobControlCompiler {
PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new Properties());
pigContext.connect();
pigContext.addJar(tmpFile.getAbsolutePath());
- Configuration conf = new Configuration();
- JobControlCompiler jobControlCompiler = new JobControlCompiler(pigContext, conf);
+ JobControlCompiler jobControlCompiler = new JobControlCompiler(pigContext, CONF);
MROperPlan plan = new MROperPlan();
MapReduceOper mro = new MapReduceOper(new OperatorKey());
mro.UDFs = new HashSet<String>();
@@ -107,6 +115,68 @@ public class TestJobControlCompiler {
}
+ @Test
+ public void testEstimateNumberOfReducers() throws Exception {
+ Assert.assertEquals(2, JobControlCompiler.estimateNumberOfReducers(CONF,
+ Lists.newArrayList(createPOLoadWithSize(2L * 1000 * 1000 * 999,
+ new PigStorage())),
+ new org.apache.hadoop.mapreduce.Job(CONF)));
+
+ Assert.assertEquals(2, JobControlCompiler.estimateNumberOfReducers(CONF,
+ Lists.newArrayList(createPOLoadWithSize(2L * 1000 * 1000 * 1000,
+ new PigStorage())),
+ new org.apache.hadoop.mapreduce.Job(CONF)));
+
+ Assert.assertEquals(3, JobControlCompiler.estimateNumberOfReducers(CONF,
+ Lists.newArrayList(createPOLoadWithSize(2L * 1000 * 1000 * 1001,
+ new PigStorage())),
+ new org.apache.hadoop.mapreduce.Job(CONF)));
+ }
+
+ @Test
+ public void testGetInputSizeFromFs() throws Exception {
+ long size = 2L * 1024 * 1024 * 1024;
+ Assert.assertEquals(size, JobControlCompiler.getInputSize(
+ CONF, Lists.newArrayList(createPOLoadWithSize(size, new PigStorage())),
+ new org.apache.hadoop.mapreduce.Job(CONF)));
+
+ Assert.assertEquals(size, JobControlCompiler.getInputSize(
+ CONF,
+ Lists.newArrayList(createPOLoadWithSize(size, new PigStorageWithStatistics())),
+ new org.apache.hadoop.mapreduce.Job(CONF)));
+
+ Assert.assertEquals(size * 2, JobControlCompiler.getInputSize(
+ CONF,
+ Lists.newArrayList(
+ createPOLoadWithSize(size, new PigStorage()),
+ createPOLoadWithSize(size, new PigStorageWithStatistics())),
+ new org.apache.hadoop.mapreduce.Job(CONF)));
+ }
+
+ @Test
+ public void testGetInputSizeFromLoader() throws Exception {
+ long size = 2L * 1024 * 1024 * 1024;
+ Assert.assertEquals(size, JobControlCompiler.getInputSizeFromLoader(
+ createPOLoadWithSize(size, new PigStorageWithStatistics()),
+ new org.apache.hadoop.mapreduce.Job(CONF)));
+ }
+
+ private static POLoad createPOLoadWithSize(long size, LoadFunc loadFunc) throws Exception {
+ File file = File.createTempFile("tempFile", ".tmp");
+ file.deleteOnExit();
+ RandomAccessFile f = new RandomAccessFile(file, "rw");
+ f.setLength(size);
+
+ loadFunc.setLocation(file.getAbsolutePath(), new org.apache.hadoop.mapreduce.Job(CONF));
+ FuncSpec funcSpec = new FuncSpec(loadFunc.getClass().getCanonicalName());
+ POLoad poLoad = new POLoad(new OperatorKey(), loadFunc);
+ poLoad.setLFile(new FileSpec(file.getAbsolutePath(), funcSpec));
+ poLoad.setPc(new PigContext());
+ poLoad.setUp();
+
+ return poLoad;
+ }
+
/**
* checks if the given file name is in the jar
* @param jarFile the jar to check