You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/02/01 04:11:39 UTC

svn commit: r617338 [2/5] - in /incubator/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/ src/org/apache/pig/backend/datastorage/ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/hadoop/ src/org/apache/pig/backend/ha...

Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,191 @@
+
+package org.apache.pig.backend.hadoop.datastorage;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.pig.backend.datastorage.*;
+
+public abstract class HPath implements ElementDescriptor {
+
+    protected Path path;
+    protected HDataStorage fs;
+
+    public HPath(HDataStorage fs, Path parent, Path child) {
+        this.path = new Path(parent, child);
+        this.fs = fs;
+    }
+
+    public HPath(HDataStorage fs, String parent, String child) {
+        this(fs, new Path(parent), new Path(child));
+    }
+    
+    public HPath(HDataStorage fs, Path parent, String child) {
+        this(fs, parent, new Path(child));
+    }
+
+    public HPath(HDataStorage fs, String parent, Path child) {
+        this(fs, new Path(parent), child);
+    }
+        
+    public HPath(HDataStorage fs, String pathString) {
+        this(fs, new Path(pathString));
+    }
+        
+    public HPath(HDataStorage fs, Path path) {
+        this.path = path;
+        this.fs = fs;
+    }
+
+    @Override
+    public DataStorage getDataStorage() {
+        return fs;
+    }
+    
+    @Override
+    public abstract OutputStream create(Properties configuration) 
+             throws IOException;
+    
+    @Override
+    public void copy(ElementDescriptor dstName,
+    				 Properties dstConfiguration,
+                     boolean removeSrc)
+        	throws IOException {
+    	FileSystem srcFS = this.fs.getHFS();
+    	FileSystem dstFS = ((HPath)dstName).fs.getHFS();
+    	
+    	Path srcPath = this.path;
+    	Path dstPath = ((HPath)dstName).path;
+    	
+    	boolean result = FileUtil.copy(srcFS,
+    								   srcPath,
+    								   dstFS,
+    								   dstPath,
+    								   false,
+    								   new Configuration());
+    	
+    	if (!result) {
+    		throw new IOException("Failed to copy from: " + this.toString() +
+    							  " to: " + dstName.toString());
+    	}
+    }
+    
+    @Override
+    public abstract InputStream open() throws IOException;
+
+    @Override
+    public abstract SeekableInputStream sopen() throws IOException;
+
+    @Override
+    public boolean exists() throws IOException {
+        return fs.getHFS().exists(path);
+    }
+    
+    @Override
+    public void rename(ElementDescriptor newName) 
+             throws IOException {
+        if (newName != null) {
+            fs.getHFS().rename(path, ((HPath)newName).path);
+        }
+    }
+
+    @Override
+    public void delete() throws IOException {
+    	// the file is removed and not placed in the trash bin
+        fs.getHFS().delete(path);
+    }
+
+    @Override
+    public Properties getConfiguration() throws IOException {
+        HConfiguration props = new HConfiguration();
+
+        long blockSize = fs.getHFS().getFileStatus(path).getBlockSize();
+
+        short replication = fs.getHFS().getFileStatus(path).getReplication();
+        
+        props.setProperty(BLOCK_SIZE_KEY, (new Long(blockSize)).toString());
+        props.setProperty(BLOCK_REPLICATION_KEY, (new Short(replication)).toString());
+        
+        return props;
+    }
+
+    @Override
+    public void updateConfiguration(Properties newConfig) throws IOException {
+        if (newConfig == null) {
+            return;
+        }
+        
+        String blkReplStr = newConfig.getProperty(BLOCK_REPLICATION_KEY);
+        
+        fs.getHFS().setReplication(path, 
+                                   new Short(blkReplStr).shortValue());    
+    }
+
+    @Override
+    public Map<String, Object> getStatistics() throws IOException {
+        HashMap<String, Object> props = new HashMap<String, Object>();
+        
+        Long length = new Long(fs.getHFS().getFileStatus(path).getLen());
+
+        Long modificationTime = new Long(fs.getHFS().getFileStatus(path).
+                                         getModificationTime());
+
+        props.put(LENGTH_KEY, length.toString());
+        props.put(MODIFICATION_TIME_KEY, modificationTime.toString());
+        
+        return props;
+    }
+
+    @Override
+    public OutputStream create() throws IOException {
+        return create(null);
+    }
+
+    @Override
+    public void copy(ElementDescriptor dstName,
+                     boolean removeSrc) 
+            throws IOException {
+        copy(dstName, null, removeSrc);
+    }
+    
+    public Path getPath() {
+        return path;
+    }
+    
+    public FileSystem getHFS() {
+        return fs.getHFS();
+    }
+    
+    @Override
+    public String toString() {
+        return path.toString();
+    }
+    
+    @Override
+    public boolean equals(Object obj) {
+        if (! (obj instanceof HPath)) {
+            return false;
+        }
+        
+        return this.path.equals(((HPath)obj).path);  
+    }
+    
+    @Override
+    public int compareTo(ElementDescriptor other) {
+        return path.compareTo(((HPath)other).path);
+    }
+    
+    @Override
+    public int hashCode() {
+        return this.path.hashCode();
+    }
+}

Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,94 @@
+package org.apache.pig.backend.hadoop.datastorage;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import org.apache.pig.backend.datastorage.SeekableInputStream;
+
+public class HSeekableInputStream extends SeekableInputStream {
+
+    protected FSDataInputStream input;
+    protected long contentLength;
+    
+    HSeekableInputStream(FSDataInputStream input,
+                         long contentLength) {
+        this.input = input;
+        this.contentLength = contentLength;
+    }
+    
+    @Override
+    public void seek(long offset, FLAGS whence) throws IOException {
+        long targetPos;
+        
+        switch (whence) {
+        case SEEK_SET: {
+            targetPos = offset;
+            break;
+        }
+        case SEEK_CUR: {
+            targetPos = input.getPos() + offset;
+            break;
+        }
+        case SEEK_END: {
+            targetPos = contentLength + offset;
+            break;
+        }
+        default: {
+            throw new IOException("Invalid seek option: " + whence);
+        }
+        }
+        
+        input.seek(targetPos);
+    }
+    
+    @Override
+    public long tell() throws IOException {
+        return input.getPos();
+    }
+    
+    @Override
+    public int read() throws IOException {
+        return input.read();
+    }
+    
+    @Override
+    public int read(byte[] b) throws IOException {
+        return input.read(b);
+    }
+        
+    @Override
+    public int read(byte[] b, int off, int len ) throws IOException {
+        return input.read(b, off, len);
+    }
+    
+    @Override
+    public int available() throws IOException {
+        return input.available();
+    }
+    
+    @Override
+    public long skip(long n) throws IOException {
+        return input.skip(n);
+    }
+    
+    @Override
+    public void close() throws IOException {
+        input.close();
+    }
+    
+    @Override
+    public void mark(int readlimit) {
+        input.mark(readlimit);
+    }
+    
+    @Override
+    public void reset() throws IOException {
+        input.reset();
+    }
+    
+    @Override
+    public boolean markSupported() {
+        return input.markSupported();
+    }
+}

Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,476 @@
+package org.apache.pig.backend.hadoop.executionengine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketImplFactory;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Enumeration;
+
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.backend.executionengine.*;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
+import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+
+import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
+import org.apache.pig.backend.local.executionengine.LocalResult;
+import org.apache.pig.data.Tuple;
+
+import org.apache.log4j.Logger;
+
+import org.apache.pig.shock.SSHSocketImplFactory;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.JobSubmissionProtocol;
+import org.apache.hadoop.mapred.JobClient;
+
+
+public class HExecutionEngine implements ExecutionEngine {
+    
+    protected PigContext pigContext;
+    
+    protected Logger logger;
+    protected DataStorage ds;
+    protected HConfiguration conf;
+    
+    protected JobSubmissionProtocol jobTracker;
+    protected JobClient jobClient;
+
+    // key: the operator key from the logical plan that originated the physical plan
+    // val: the operator key for the root of the phyisical plan
+    protected Map<OperatorKey, OperatorKey> logicalToPhysicalKeys;
+    
+    protected Map<OperatorKey, ExecPhysicalOperator> physicalOpTable;
+    
+    // map from LOGICAL key to into about the execution
+    protected Map<OperatorKey, MapRedResult> materializedResults;
+    
+    public HExecutionEngine(PigContext pigContext,
+                            Logger logger,
+                            HConfiguration conf) {
+        this.pigContext = pigContext;
+        this.logger = logger;
+        this.conf = conf;
+        this.logicalToPhysicalKeys = new HashMap<OperatorKey, OperatorKey>();
+        this.physicalOpTable = new HashMap<OperatorKey, ExecPhysicalOperator>();
+        this.materializedResults = new HashMap<OperatorKey, MapRedResult>();
+        
+        this.ds = null;
+        
+        // to be set in the init method
+        this.jobTracker = null;
+        this.jobClient = null;
+    }
+    
+    public JobClient getJobClient() {
+        return this.jobClient;
+    }
+    
+    public Map<OperatorKey, MapRedResult> getMaterializedResults() {
+    	return this.materializedResults;
+    }
+    
+    public HExecutionEngine(PigContext pigContext, Logger logger) {
+        this(pigContext, logger, new HConfiguration(new JobConf()));
+    }
+                            
+    public Map<OperatorKey, ExecPhysicalOperator> getPhysicalOpTable() {
+        return this.physicalOpTable;
+    }
+    
+    
+    public DataStorage getDataStorage() {
+        return this.ds;
+    }
+    
+    private void setJobtrackerLocation(String newLocation) {
+        conf.put("mapred.job.tracker", newLocation);
+    }
+
+    private void setFilesystemLocation(String newLocation) {
+        conf.put("fs.default.name", newLocation);
+    }
+
+    @Override
+    public void init() throws ExecException {
+        //First set the ssh socket factory
+        setSSHFactory();
+        
+        String hodServer = System.getProperty("hod.server");
+    
+        if (hodServer != null && hodServer.length() > 0) {
+            String hdfsAndMapred[] = doHod(hodServer);
+            setFilesystemLocation(hdfsAndMapred[0]);
+            setJobtrackerLocation(hdfsAndMapred[1]);
+        }
+        else {
+            String cluster = System.getProperty("cluster");
+            if (cluster != null && cluster.length() > 0) {
+                if(cluster.indexOf(':') < 0) {
+                    cluster = cluster + ":50020";
+                }
+                setJobtrackerLocation(cluster);
+            }
+
+            String nameNode = System.getProperty("namenode");
+            if (nameNode!=null && nameNode.length() > 0) {
+                if(nameNode.indexOf(':') < 0) {
+                    nameNode = nameNode + ":8020";
+                }
+                setFilesystemLocation(nameNode);
+            }
+        }
+     
+        logger.info("Connecting to hadoop file system at: " + conf.get("fs.default.name"));
+
+        try {
+            ds = new HDataStorage(conf);
+        }
+        catch (IOException e) {
+            throw new ExecException("Failed to create DataStorage", e);
+        }
+            
+        logger.info("Connecting to map-reduce job tracker at: " + conf.get("mapred.job.tracker"));
+        
+        try {
+            jobTracker = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
+                                                              JobSubmissionProtocol.versionID, 
+                                                              JobTracker.getAddress(conf.getConfiguration()),
+                                                              conf.getConfiguration());
+        }
+        catch (IOException e) {
+            throw new ExecException("Failed to crate job tracker", e);
+        }
+
+        try {
+            jobClient = new JobClient(new JobConf(conf.getConfiguration()));
+        }
+        catch (IOException e) {
+            throw new ExecException("Failed to create job client", e);
+        }
+    }
+
+    @Override
+    public void close() throws ExecException {
+        ;
+    }
+        
+    @Override
+    public Properties getConfiguration() throws ExecException {
+        return this.conf;
+    }
+        
+    @Override
+    public void updateConfiguration(Properties newConfiguration) 
+        	throws ExecException {
+    	Enumeration keys = newConfiguration.propertyNames();
+    	
+    	while (keys.hasMoreElements()) {
+    		Object obj = keys.nextElement();
+    		
+    		if (obj instanceof String) {
+    			String str = (String) obj;
+    			
+    			conf.put(str, newConfiguration.get(str));
+    		}
+    	}
+    }
+        
+    @Override
+    public Map<String, Object> getStatistics() throws ExecException {
+    	throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ExecPhysicalPlan compile(ExecLogicalPlan plan,
+                                               Properties properties)
+            throws ExecException {
+        return compile(new ExecLogicalPlan[] { plan },
+                       properties);
+    }
+
+    @Override
+    public ExecPhysicalPlan compile(ExecLogicalPlan[] plans,
+                                               Properties properties)
+            throws ExecException {
+        if (plans == null) {
+            throw new ExecException("No Plans to compile");
+        }
+
+        OperatorKey physicalKey = null;
+        for (int i = 0; i < plans.length; ++i) {
+            ExecLogicalPlan curPlan = null;
+
+            curPlan = plans[ i ];
+     
+            OperatorKey logicalKey = curPlan.getRoot();
+            
+            physicalKey = logicalToPhysicalKeys.get(logicalKey);
+            
+            if (physicalKey == null) {
+                try {
+                physicalKey = new MapreducePlanCompiler(pigContext).
+                                        compile(curPlan.getRoot(),
+                                                curPlan.getOpTable(),
+                                                this);
+                }
+                catch (IOException e) {
+                    throw new ExecException("Failed to compile plan (" + i + ") " + logicalKey,
+                                                       e);
+                }
+                
+                logicalToPhysicalKeys.put(logicalKey, physicalKey);
+            }            
+        }
+        
+        return new MapRedPhysicalPlan(physicalKey, physicalOpTable);
+    }
+
+    @Override
+    public ExecJob execute(ExecPhysicalPlan plan) 
+            throws ExecException {
+
+        POMapreduce pom = (POMapreduce) physicalOpTable.get(plan.getRoot());
+
+        MapReduceLauncher.initQueryStatus(pom.numMRJobs());  // initialize status, for bookkeeping purposes.
+        MapReduceLauncher.setConf(this.conf.getConfiguration());
+        MapReduceLauncher.setExecEngine(this);
+        
+        // if the final operator is a MapReduce with no output file, then send to a temp
+        // file.
+        if (pom.outputFileSpec==null) {
+            try {
+                pom.outputFileSpec = new FileSpec(FileLocalizer.getTemporaryPath(null, pigContext).toString(),
+                                                  BinStorage.class.getName());
+            }
+            catch (IOException e) {
+                throw new ExecException("Failed to obtain temp file for " + plan.getRoot().toString(), e);
+            }
+        }
+
+        try {
+            pom.open();
+            
+            Tuple t;
+            while ((t = (Tuple) pom.getNext()) != null) {
+                ;
+            }
+            
+            pom.close();
+            
+            this.materializedResults.put(pom.sourceLogicalKey,
+            							 new MapRedResult(pom.outputFileSpec,
+            									 		  pom.reduceParallelism));
+        }
+        catch (IOException e) {
+            throw new ExecException(e);
+        }
+        
+        return new HJob(JOB_STATUS.COMPLETED, pigContext, pom.outputFileSpec);
+
+    }
+
+    @Override
+    public ExecJob submit(ExecPhysicalPlan plan) throws ExecException {
+    	throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Collection<ExecJob> runningJobs(Properties properties) throws ExecException {
+    	throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public Collection<String> activeScopes() throws ExecException {
+    	throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public void reclaimScope(String scope) throws ExecException {
+    	throw new UnsupportedOperationException();
+    }
+    
+    private void setSSHFactory(){
+        String g = System.getProperty("ssh.gateway");
+        if (g == null || g.length() == 0) return;
+        try {
+            Class clazz = Class.forName("org.apache.pig.shock.SSHSocketImplFactory");
+            SocketImplFactory f = (SocketImplFactory)clazz.getMethod("getFactory", new Class[0]).invoke(0, new Object[0]);
+            Socket.setSocketImplFactory(f);
+        } 
+        catch (SocketException e) {}
+        catch (Exception e){
+            throw new RuntimeException(e);
+        }
+    }
+
+    //To prevent doing hod if the pig server is constructed multiple times
+    private static String hodMapRed;
+    private static String hodHDFS;
+
+    private enum ParsingState {
+        NOTHING, HDFSUI, MAPREDUI, HDFS, MAPRED, HADOOPCONF
+    };
+    
+    private String[] doHod(String server) {
+        if (hodMapRed != null) {
+            return new String[] {hodHDFS, hodMapRed};
+        }
+        
+        try {
+            Process p = null;
+            // Make the kryptonite released version the default if nothing
+            // else is specified.
+            StringBuilder cmd = new StringBuilder();
+            cmd.append(System.getProperty("hod.expect.root"));
+            cmd.append('/');
+            cmd.append("libexec/pig/");
+            cmd.append(System.getProperty("hod.expect.uselatest"));
+            cmd.append('/');
+            cmd.append(System.getProperty("hod.command"));
+
+            String cluster = System.getProperty("yinst.cluster");
+           
+            // TODO This is a Yahoo specific holdover, need to remove
+            // this.
+            if (cluster != null && cluster.length() > 0 && !cluster.startsWith("kryptonite")) {
+                cmd.append(" --config=");
+                cmd.append(System.getProperty("hod.config.dir"));
+                cmd.append('/');
+                cmd.append(cluster);
+            }
+
+            cmd.append(" " + System.getProperty("hod.param", ""));
+
+            if (server.equals("local")) {
+                p = Runtime.getRuntime().exec(cmd.toString());
+            } 
+            else {
+                SSHSocketImplFactory fac = SSHSocketImplFactory.getFactory(server);
+                p = fac.ssh(cmd.toString());
+            }
+            
+            InputStream is = p.getInputStream();
+
+            logger.info("Connecting to HOD...");
+            logger.debug("sending HOD command " + cmd.toString());
+
+            StringBuffer sb = new StringBuffer();
+            int c;
+            String hdfsUI = null;
+            String mapredUI = null;
+            String hdfs = null;
+            String mapred = null;
+            String hadoopConf = null;
+
+            ParsingState current = ParsingState.NOTHING;
+
+            while((c = is.read()) != -1 && mapred == null) {
+                if (c == '\n' || c == '\r') {
+                    switch(current) {
+                    case HDFSUI:
+                        hdfsUI = sb.toString().trim();
+                        logger.info("HDFS Web UI: " + hdfsUI);
+                        break;
+                    case HDFS:
+                        hdfs = sb.toString().trim();
+                        logger.info("HDFS: " + hdfs);
+                        break;
+                    case MAPREDUI:
+                        mapredUI = sb.toString().trim();
+                        logger.info("JobTracker Web UI: " + mapredUI);
+                        break;
+                    case MAPRED:
+                        mapred = sb.toString().trim();
+                        logger.info("JobTracker: " + mapred);
+                        break;
+                    case HADOOPCONF:
+                        hadoopConf = sb.toString().trim();
+                        logger.info("HadoopConf: " + hadoopConf);
+                        break;
+                    }
+                    current = ParsingState.NOTHING;
+                    sb = new StringBuffer();
+                }
+                sb.append((char)c);
+                if (sb.indexOf("hdfsUI:") != -1) {
+                    current = ParsingState.HDFSUI;
+                    sb = new StringBuffer();
+                } 
+                else if (sb.indexOf("hdfs:") != -1) {
+                    current = ParsingState.HDFS;
+                    sb = new StringBuffer();
+                } 
+                else if (sb.indexOf("mapredUI:") != -1) {
+                    current = ParsingState.MAPREDUI;
+                    sb = new StringBuffer();
+                } 
+                else if (sb.indexOf("mapred:") != -1) {
+                    current = ParsingState.MAPRED;
+                    sb = new StringBuffer();
+                } 
+                else if (sb.indexOf("hadoopConf:") != -1) {
+                    current = ParsingState.HADOOPCONF;
+                    sb = new StringBuffer();
+                }    
+            }
+            
+            hdfsUI = fixUpDomain(hdfsUI);
+            hdfs = fixUpDomain(hdfs);
+            mapredUI = fixUpDomain(mapredUI);
+            mapred = fixUpDomain(mapred);
+            hodHDFS = hdfs;
+            hodMapRed = mapred;
+
+            if (hadoopConf != null) {
+                JobConf jobConf = new JobConf(hadoopConf);
+                jobConf.addResource("pig-cluster-hadoop-site.xml");
+                
+                conf = new HConfiguration(jobConf);
+                
+                // make sure that files on class path are used
+                System.out.println("Job Conf = " + conf);
+                System.out.println("dfs.block.size= " + conf.get("dfs.block.size"));
+                System.out.println("ipc.client.timeout= " + conf.get("ipc.client.timeout"));
+                System.out.println("mapred.child.java.opts= " + conf.get("mapred.child.java.opts"));
+            }
+            else {
+                throw new IOException("Missing Hadoop configuration file");
+            }
+            return new String[] {hdfs, mapred};
+        } 
+        catch (Exception e) {
+            logger.fatal("Could not connect to HOD", e);
+            System.exit(4);
+        }
+        throw new RuntimeException("Could not scrape needed information.");
+    }
+
+    private String fixUpDomain(String hostPort) throws UnknownHostException {
+        String parts[] = hostPort.split(":");
+        if (parts[0].indexOf('.') == -1) {
+            parts[0] = parts[0] + ".inktomisearch.com";
+        }
+        InetAddress.getByName(parts[0]);
+        return parts[0] + ":" + parts[1];
+    }
+    
+}
+
+
+
+

Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,136 @@
+package org.apache.pig.backend.hadoop.executionengine;
+
+import java.io.OutputStream;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+
+
+public class HJob implements ExecJob {
+
+    protected JOB_STATUS status;
+    protected PigContext pigContext;
+    protected FileSpec outFileSpec;
+    
+    public HJob(JOB_STATUS status,
+                PigContext pigContext,
+                FileSpec outFileSpec) {
+        this.status = status;
+        this.pigContext = pigContext;
+        this.outFileSpec = outFileSpec;
+    }
+    
+    @Override
+    public JOB_STATUS getStatus() {
+        return status;
+    }
+    
+    @Override
+    public boolean hasCompleted() throws ExecException {
+        return true;
+    }
+    
+    @Override
+    public Iterator<Tuple> getResults() throws ExecException {
+        final LoadFunc p;
+        
+        try{
+             p = (LoadFunc)PigContext.instantiateFuncFromSpec(outFileSpec.getFuncSpec());
+
+             InputStream is = FileLocalizer.open(outFileSpec.getFileName(), pigContext);
+
+             p.bindTo(outFileSpec.getFileName(), new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
+
+        }catch (Exception e){
+            throw new ExecException("Unable to get results for " + outFileSpec, e);
+        }
+        
+        return new Iterator<Tuple>() {
+            Tuple   t;
+            boolean atEnd;
+
+            public boolean hasNext() {
+                if (atEnd)
+                    return false;
+                try {
+                    if (t == null)
+                        t = p.getNext();
+                    if (t == null)
+                        atEnd = true;
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    t = null;
+                    atEnd = true;
+                }
+                return !atEnd;
+            }
+
+            public Tuple next() {
+                Tuple next = t;
+                if (next != null) {
+                    t = null;
+                    return next;
+                }
+                try {
+                    next = p.getNext();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+                if (next == null)
+                    atEnd = true;
+                return next;
+            }
+
+            public void remove() {
+                throw new RuntimeException("Removal not supported");
+            }
+
+        };
+    }
+
+    @Override
+    public Properties getContiguration() {
+        Properties props = new Properties();
+        return props;
+    }
+
+    @Override
+    public Map<String, Object> getStatistics() {
+    	throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void completionNotification(Object cookie) {
+        throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public void kill() throws ExecException {
+        throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public void getLogs(OutputStream log) throws ExecException {
+    	throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public void getSTDOut(OutputStream out) throws ExecException {
+    	throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public void getSTDError(OutputStream error) throws ExecException {
+    	throw new UnsupportedOperationException();
+    }
+}

Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedPhysicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedPhysicalPlan.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedPhysicalPlan.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedPhysicalPlan.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,56 @@
+package org.apache.pig.backend.hadoop.executionengine;
+
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
+import org.apache.pig.backend.local.executionengine.POVisitor;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+
+public class MapRedPhysicalPlan implements ExecPhysicalPlan {
+    private static final long serialVersionUID = 1;
+    
+    protected OperatorKey root;
+    protected Map<OperatorKey, ExecPhysicalOperator> opTable;    
+    
+    MapRedPhysicalPlan(OperatorKey root,
+                       Map<OperatorKey, ExecPhysicalOperator> opTable) {
+        this.root = root;
+        this.opTable = opTable;
+    }
+    
+    @Override
+    public Properties getConfiguration() {
+        // TODO
+        return null;
+    }
+
+    @Override
+    public void updateConfiguration(Properties configuration)
+        throws ExecException {
+        // TODO
+    }
+             
+    @Override
+    public void explain(OutputStream out) {
+        POVisitor lprinter = new POVisitor(new PrintStream(out));
+        
+        ((PhysicalOperator)opTable.get(root)).visit(lprinter, "");
+    }
+    
+    @Override
+    public Map<OperatorKey, ExecPhysicalOperator> getOpTable() {
+    	return opTable;
+    }
+    
+    @Override
+    public OperatorKey getRoot() {
+        return this.root;
+    }
+}
+

Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedResult.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedResult.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedResult.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedResult.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,15 @@
+package org.apache.pig.backend.hadoop.executionengine;
+
+import org.apache.pig.impl.io.FileSpec;
+
+public class MapRedResult {
+    public FileSpec outFileSpec;
+    public int parallelismRequest;
+    
+    public MapRedResult(FileSpec outFileSpec,
+    					int parallelismRequest) {
+        this.outFileSpec = outFileSpec;
+        this.parallelismRequest = parallelismRequest;
+    }
+}
+

Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.FunctionInstantiator;
+import org.apache.pig.impl.builtin.FindQuantiles;
+import org.apache.pig.impl.builtin.RandomSampleLoader;
+import org.apache.pig.impl.eval.BinCondSpec;
+import org.apache.pig.impl.eval.ConstSpec;
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.eval.FilterSpec;
+import org.apache.pig.impl.eval.FuncEvalSpec;
+import org.apache.pig.impl.eval.GenerateSpec;
+import org.apache.pig.impl.eval.ProjectSpec;
+import org.apache.pig.impl.eval.CompositeEvalSpec;
+import org.apache.pig.impl.eval.MapLookupSpec;
+import org.apache.pig.impl.eval.SortDistinctSpec;
+import org.apache.pig.impl.eval.StarSpec;
+import org.apache.pig.impl.eval.EvalSpecVisitor;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOEval;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LOSort;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LOStore;
+import org.apache.pig.impl.logicalLayer.LOUnion;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.physicalLayer.PlanCompiler;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.SortPartitioner;
+import org.apache.pig.backend.hadoop.datastorage.HFile;
+import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
+
+// compiler for mapreduce physical plans
+public class MapreducePlanCompiler {
+
+    protected PigContext pigContext;
+    protected NodeIdGenerator nodeIdGenerator;
+
+    protected MapreducePlanCompiler(PigContext pigContext) {
+		this.pigContext = pigContext;
+		this.nodeIdGenerator = NodeIdGenerator.getGenerator();
+	}
+
+    private String getTempFile(PigContext pigcontext) throws IOException {
+        return FileLocalizer.getTemporaryPath(null,
+                                              pigContext).toString();
+    }
+    
+	public OperatorKey compile(OperatorKey logicalKey, 
+	                           Map<OperatorKey, LogicalOperator> logicalOpTable, 
+	                           HExecutionEngine execEngine) throws IOException {
+	    
+	    // check to see if we have materialized results for the logical tree to
+	    // compile, if so, re-use them...
+		//
+		Map<OperatorKey, MapRedResult> materializedResults = execEngine.getMaterializedResults();
+		
+		MapRedResult materializedResult = materializedResults.get(logicalKey);
+	    
+		if (materializedResult != null) {
+			POMapreduce pom = new POMapreduce(logicalKey.getScope(),
+											  nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+											  execEngine.getPhysicalOpTable(),
+											  logicalKey,
+											  pigContext);
+
+			String filename = FileLocalizer.fullPath(materializedResult.outFileSpec.getFileName(), pigContext);
+			FileSpec fileSpec = new FileSpec(filename, materializedResult.outFileSpec.getFuncSpec());
+			pom.addInputFile(fileSpec);
+			pom.mapParallelism = Math.max(pom.mapParallelism, materializedResult.parallelismRequest);
+
+			return pom.getOperatorKey();			
+		}
+		
+        // first, compile inputs into MapReduce operators
+        OperatorKey[] compiledInputs = new OperatorKey[logicalOpTable.get(logicalKey).getInputs().size()];
+        
+        for (int i = 0; i < logicalOpTable.get(logicalKey).getInputs().size(); i++)
+            compiledInputs[i] = compile(logicalOpTable.get(logicalKey).getInputs().get(i),
+                                        logicalOpTable,
+                                        execEngine);
+        
+        // then, compile this operator; if possible, merge with previous MapReduce
+        // operator rather than introducing a new one
+        
+        LogicalOperator lo = logicalOpTable.get(logicalKey);
+        
+        if (lo instanceof LOEval) {
+            POMapreduce pom = ((POMapreduce)execEngine.getPhysicalOpTable().get(compiledInputs[0]))
+                                .copy(nodeIdGenerator.getNextNodeId(logicalKey.getScope())); // make a copy of the previous
+            
+            // need to do this because the copy is implemented via serialize/deserialize and the ctr is not
+            // invoked for pom.
+            execEngine.getPhysicalOpTable().put(pom.getOperatorKey(), pom);
+            
+            // MapReduce operator (NOTE: it's important that we make a copy rather than using it
+            // directly; this matters in the case of a "split")
+
+            pushInto((LOEval) lo, pom); // add the computation specified by "lo" to this mapreduce
+            // operator
+            
+            return pom.getOperatorKey();
+        } 
+        else if (lo instanceof LOCogroup) {
+            POMapreduce pom = new POMapreduce(logicalKey.getScope(),
+                                              nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+                                              execEngine.getPhysicalOpTable(),
+                                              logicalKey,
+                                              pigContext, 
+                                              -1, 
+                                              logicalOpTable.get(logicalKey).getRequestedParallelism());
+
+            pom.groupFuncs = (((LOCogroup) lo).getSpecs());
+            
+            connectInputs(compiledInputs, pom, execEngine.getPhysicalOpTable());
+
+            return pom.getOperatorKey();
+        }  
+        else if (lo instanceof LOSplitOutput){
+            POMapreduce child = (POMapreduce)execEngine.getPhysicalOpTable().get(compiledInputs[0]);
+            String fileName = child.toSplit.tempFiles.get(((LOSplitOutput)lo).getReadFrom());
+            POMapreduce pom = new POMapreduce(lo.getScope(),
+                                              nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+                                              execEngine.getPhysicalOpTable(),
+                                              logicalKey,
+                                              pigContext,
+                                              child.getOperatorKey());
+            FileSpec inputFileSpec = new FileSpec(fileName, BinStorage.class.getName());
+            pom.addInputFile(inputFileSpec);
+            return pom.getOperatorKey();
+        }
+        else if (lo instanceof LOSplit) {
+            //Make copy of previous operator
+        	POMapreduce pom = ((POMapreduce)execEngine.getPhysicalOpTable().get(compiledInputs[0])).
+        	                    copy(nodeIdGenerator.getNextNodeId(logicalKey.getScope()));
+        	
+            // need to do this because the copy is implemented via serialize/deserialize and the ctr is not
+            // invoked for pom.
+            execEngine.getPhysicalOpTable().put(pom.getOperatorKey(), pom);
+            
+            pom.toSplit = new SplitSpec((LOSplit)lo, pigContext);
+        	
+        	//Technically, we don't need the output to be set, in the split case 
+        	//because nothing will go to the output. But other code assumes its non
+        	//null, so we set it to a temp file.
+            FileSpec fileSpec = new FileSpec(getTempFile(pigContext), BinStorage.class.getName());
+            pom.outputFileSpec = fileSpec;
+        	return pom.getOperatorKey();
+        }
+        else if (lo instanceof LOLoad) {
+            POMapreduce pom = new POMapreduce(lo.getScope(),
+                                              nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+                                              execEngine.getPhysicalOpTable(),
+                                              logicalKey,
+                                              pigContext,
+                                              compiledInputs);
+            LOLoad loLoad = (LOLoad) lo;
+            String filename = FileLocalizer.fullPath(loLoad.getInputFileSpec().getFileName(), pigContext);
+            FileSpec fileSpec = new FileSpec(filename, loLoad.getInputFileSpec().getFuncSpec());
+            pom.addInputFile(fileSpec);
+            pom.mapParallelism = Math.max(pom.mapParallelism, lo.getRequestedParallelism());
+            return pom.getOperatorKey();
+        } 
+        else if (lo instanceof LOStore) {
+            LOStore los = (LOStore) lo;
+            ((POMapreduce)execEngine.getPhysicalOpTable().get(compiledInputs[0])).outputFileSpec = los.getOutputFileSpec();
+            
+            ((POMapreduce)execEngine.getPhysicalOpTable().get(compiledInputs[0])).sourceLogicalKey = 
+            	los.getInputs().get(0);
+            
+            return compiledInputs[0];
+        } 
+        else if (lo instanceof LOUnion) {
+            POMapreduce pom = new POMapreduce(lo.getScope(), 
+                                              nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+                                              execEngine.getPhysicalOpTable(),
+                                              logicalKey,
+                                              pigContext, 
+                                              -1, 
+                                              lo.getRequestedParallelism());
+            connectInputs(compiledInputs, pom, execEngine.getPhysicalOpTable());
+            return pom.getOperatorKey();
+        } 
+        else if (lo instanceof LOSort) {
+        	LOSort loSort = (LOSort) lo;
+        	//must break up into 2 map reduce jobs, one for gathering quantiles, another for sorting
+        	POMapreduce quantileJob = getQuantileJob(lo.getScope(), 
+        	                                         nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+        	                                         execEngine.getPhysicalOpTable(),
+        	                                         logicalKey,
+        	                                         (POMapreduce) (execEngine.getPhysicalOpTable().get(compiledInputs[0])), 
+        	                                         loSort);
+        	
+        	return getSortJob(lo.getScope(), 
+        	                  nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+        	                  execEngine.getPhysicalOpTable(),
+        	                  logicalKey,
+        	                  quantileJob, 
+        	                  loSort).getOperatorKey();
+        }
+        throw new IOException("Unknown logical operator.");
+    }
+    
+    // added for UNION:
+    // returns true iff okay to merge this operator with a subsequent binary op (e.g., co-group or union).
+    // this is the case iff (1) this operator doesn't do grouping (which requires its own reduce phase), and (2) this operator isn't itself a binary op
+    private boolean okayToMergeWithBinaryOp(POMapreduce mro) {
+        return (!mro.doesGrouping() && (mro.numInputFiles() == 1));
+    }
+    
+    private void connectInputs(OperatorKey[] compiledInputs, 
+                               POMapreduce pom,
+                               Map<OperatorKey, ExecPhysicalOperator> physicalOpTable) throws IOException {
+        // connect inputs (by merging operators, if possible; else connect via temp files)
+        for (int i = 0; i < compiledInputs.length; i++) {
+            if (okayToMergeWithBinaryOp((POMapreduce)physicalOpTable.get(compiledInputs[i]))) {
+                // can merge input i with this operator
+            	pom.addInputFile(((POMapreduce)physicalOpTable.get(compiledInputs[i])).getFileSpec(0), 
+            	                  ((POMapreduce)physicalOpTable.get(compiledInputs[i])).getEvalSpec(0));
+                pom.addInputOperators(((PhysicalOperator)physicalOpTable.get(compiledInputs[i])).inputs);
+            } else {
+                // chain together via a temp file
+                String tempFile = getTempFile(pigContext);
+                FileSpec fileSpec = new FileSpec( tempFile, BinStorage.class.getName());
+                ((POMapreduce)physicalOpTable.get(compiledInputs[i])).outputFileSpec = fileSpec;
+                pom.addInputFile(fileSpec);
+                pom.addInputOperator(compiledInputs[i]);
+            }
+        }
+    }
+
+    // push the function evaluated by "lo" into the map-reduce operator "mro"
+    private void pushInto(LOEval lo, POMapreduce mro) throws IOException {
+
+        if (!mro.doesGrouping()) { // push into "map" phase
+           
+            // changed for UNION:
+            for (int index = 0; index < mro.toMap.size(); index++) {
+                mro.addMapSpec(index, lo.getSpec());
+            }
+            //int index = mro.toMap.list.size() - 1;
+            //mro.toMap.list.get(index).add(lo.spec);
+            
+            mro.mapParallelism = Math.max(mro.mapParallelism, lo.getRequestedParallelism());
+
+        } else { // push into "reduce" phase
+            
+            EvalSpec spec = lo.getSpec();
+
+            if (mro.toReduce == null && shouldCombine(spec)) {
+                // Push this spec into the combiner.  But we also need to
+                // create a new spec with a changed expected projection to
+                // push into the reducer.
+
+                if (mro.toCombine != null) {
+                    throw new AssertionError("Combiner already set.");
+                }
+                // mro.toCombine = spec;
+
+                // Now, we need to adjust the expected projection for the
+                // eval spec(s) we just pushed.  Also, this will change the
+                // function to be the final instead of general instance.
+                EvalSpec newSpec = spec.copy(pigContext);
+                newSpec.visit(new ReduceAdjuster(pigContext));
+                mro.addReduceSpec(newSpec);
+
+                // Adjust the function name for the combine spec, to set it
+                // to the initial function instead of the general
+                // instance.  Make a copy of the eval spec rather than
+                // adjusting the existing one, to prevent breaking the 
+                // logical plan in case another physical plan is generated
+                // from it later.
+                EvalSpec combineSpec = spec.copy(pigContext);
+                combineSpec.visit(new CombineAdjuster());
+                mro.toCombine = combineSpec;
+
+            } else {
+                mro.addReduceSpec(lo.getSpec()); // otherwise, don't use combiner
+            }
+            
+            mro.reduceParallelism = Math.max(mro.reduceParallelism, lo.getRequestedParallelism());
+
+        }
+    }
+    
+    
+    private POMapreduce getQuantileJob(String scope,
+                                       long id,
+                                       Map<OperatorKey, ExecPhysicalOperator> physicalOpTable,
+                                       OperatorKey logicalKey,
+                                       POMapreduce input, 
+                                       LOSort loSort) throws IOException{
+    	//first the quantile job
+    	POMapreduce quantileJob = new POMapreduce(scope, 
+    											  id,
+    											  physicalOpTable,
+    											  logicalKey,
+    											  pigContext,
+    											  input.getOperatorKey());
+    	//first materialize the output of the previous stage
+    	String fileName = getTempFile(pigContext);
+    	input.outputFileSpec = new FileSpec(fileName,BinStorage.class.getName());
+    	
+    	//Load the output using a random sample load function
+    	FileSpec inputFileSpec = new FileSpec(fileName, RandomSampleLoader.class.getName());
+    	quantileJob.addInputFile(inputFileSpec);
+    	
+		quantileJob.addMapSpec(0, loSort.getSortSpec());
+    	
+		//Constructing the query structures by hand, quite ugly.
+		
+    	//group all
+    	ArrayList<EvalSpec> groupFuncs = new ArrayList<EvalSpec>();
+	
+    	groupFuncs.add(new GenerateSpec(new ConstSpec("all")).getGroupBySpec());
+	
+    	quantileJob.groupFuncs = groupFuncs;
+    	
+    	//find the quantiles in the reduce step
+    	ArrayList<EvalSpec> argsList = new ArrayList<EvalSpec>();
+    	argsList.add(new ConstSpec(Math.max(loSort.getRequestedParallelism()-1,1)));
+    	
+    	//sort the first column of the cogroup output and feed it to the quantiles function
+    	EvalSpec sortedSampleSpec = new ProjectSpec(1);
+    	sortedSampleSpec = sortedSampleSpec.addSpec(new SortDistinctSpec(false, new StarSpec()));
+    	argsList.add(sortedSampleSpec);
+    	
+    	EvalSpec args = new GenerateSpec(argsList);
+
+    	EvalSpec reduceSpec = new FuncEvalSpec(pigContext, FindQuantiles.class.getName(), args);
+    	reduceSpec.setFlatten(true);
+    	quantileJob.addReduceSpec(new GenerateSpec(reduceSpec));
+    	
+    	//a temporary file to hold the quantile data
+    	String quantileFile = getTempFile(pigContext);
+    	quantileJob.outputFileSpec = new FileSpec(quantileFile, BinStorage.class.getName());
+    	
+    	return quantileJob;
+    }
+    
+    public POMapreduce getSortJob(String scope, 
+                                  long id,
+                                  Map<OperatorKey, ExecPhysicalOperator> physicalOpTable,
+                                  OperatorKey logicalKey,
+                                  POMapreduce quantileJob, 
+                                  LOSort loSort) throws IOException{
+    	POMapreduce sortJob = new POMapreduce(scope,
+    										  id,
+    										  physicalOpTable,
+    										  logicalKey,
+    										  pigContext, 
+    										  quantileJob.getOperatorKey());
+    	
+    	sortJob.quantilesFile = quantileJob.outputFileSpec.getFileName();
+    	
+    	//same input as the quantile job, but the full BinStorage load function
+    	sortJob.addInputFile(new FileSpec(quantileJob.getFileSpec(0).getFileName(), BinStorage.class.getName()));
+    	
+    	ArrayList<EvalSpec> groupFuncs = new ArrayList<EvalSpec>();
+    	    	
+    	groupFuncs.add(new GenerateSpec(loSort.getSortSpec()).getGroupBySpec());
+    	
+    	sortJob.groupFuncs = groupFuncs;
+    	sortJob.partitionFunction = SortPartitioner.class;
+    	
+		ProjectSpec ps = new ProjectSpec(1);
+		ps.setFlatten(true);
+		sortJob.addReduceSpec(new GenerateSpec(ps));
+	
+    	sortJob.reduceParallelism = loSort.getRequestedParallelism();
+    	
+    	String comparatorFuncName = loSort.getSortSpec().getComparatorName();
+    	if (comparatorFuncName != null) {
+    		sortJob.userComparator =
+                (Class<WritableComparator>)PigContext.resolveClassName(
+                    comparatorFuncName);
+    	}
+
+    	return sortJob;
+    }
+
+    private boolean shouldCombine(EvalSpec spec) {
+        // Determine whether this something we can combine or not.
+        // First, it must be a generate spec.
+        if (!(spec instanceof GenerateSpec)) {
+            return false;
+        }
+
+        GenerateSpec gen = (GenerateSpec)spec;
+
+        // Second, the first immediate child of the generate spec must be
+        // a project with a value of 0.
+        Iterator<EvalSpec> i = gen.getSpecs().iterator();
+        if (!i.hasNext()) return false;
+        EvalSpec s = i.next();
+        if (!(s instanceof ProjectSpec)) {
+            return false;
+        } else {
+            ProjectSpec p = (ProjectSpec)s;
+            if (p.numCols() > 1) return false;
+            else if (p.getCol() != 0) return false;
+        }
+
+        // Third, all subsequent immediate children of the generate spec
+        // must be func eval specs
+        while (i.hasNext()) {
+            s = i.next();
+            if (!(s instanceof FuncEvalSpec)) return false;
+        }
+
+        // Third, walk the entire tree of the generate spec and see if we
+        // can combine it.
+        CombineDeterminer cd = new CombineDeterminer();
+        gen.visit(cd);
+        return cd.useCombiner();
+    }
+
+    private class ReduceAdjuster extends EvalSpecVisitor {
+        private int position = 0;
+        FunctionInstantiator instantiator = null;
+
+        public ReduceAdjuster(FunctionInstantiator fi) {
+            instantiator = fi;
+        }
+
+        public void visitGenerate(GenerateSpec g) {
+            Iterator<EvalSpec> i = g.getSpecs().iterator();
+            for (position = 0; i.hasNext(); position++) {
+                i.next().visit(this);
+            }
+        }
+        
+        public void visitFuncEval(FuncEvalSpec fe) {
+            // Need to replace our arg spec with a project of our position.
+            // DON'T visit our args, they're exactly what we're trying to
+            // lop off.
+            // The first ProjectSpec in the Composite is because the tuples
+            // will come out of the combiner in the form (groupkey,
+            // {(x, y, z)}).  The second ProjectSpec contains the offset of
+            // the projection element we're interested in.
+            CompositeEvalSpec cs = new CompositeEvalSpec(new ProjectSpec(1));
+            cs.addSpec(new ProjectSpec(position));
+            fe.setArgs(new GenerateSpec(cs));
+
+
+            // Reset the function to call the final instance of itself
+            // instead of the general instance.  Have to instantiate the
+            // function itself first so we can find out if it's algebraic
+            // or not.
+            try {
+                fe.instantiateFunc(instantiator);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            fe.resetFuncToFinal();
+        }
+    }
+
+    private class CombineAdjuster extends EvalSpecVisitor {
+        private int position = 0;
+
+        public void visitFuncEval(FuncEvalSpec fe) {
+            // Reset the function to call the initial instance of itself
+            // instead of the general instance.
+            fe.resetFuncToInitial();
+        }
+    }
+
+    private class CombineDeterminer extends EvalSpecVisitor {
+        private class FuncCombinable extends EvalSpecVisitor {
+            public boolean combinable = true;
+
+            @Override
+            public void visitBinCond(BinCondSpec bc) {
+                combinable = false;
+            }
+            
+            @Override
+            public void visitFilter(FilterSpec bc) {
+                combinable = false;
+            }
+
+            @Override
+            public void visitFuncEval(FuncEvalSpec bc) {
+                combinable = false;
+            }
+
+            @Override
+            public void visitSortDistinct(SortDistinctSpec bc) {
+                combinable = false;
+            }
+        };
+
+        private int shouldCombine = 0;
+
+        public boolean useCombiner() {
+            return shouldCombine > 0;
+        }
+
+        @Override
+        public void visitBinCond(BinCondSpec bc) {
+            // TODO Could be true if both are true.  But the logic in
+            // CombineAdjuster and ReduceAdjuster don't know how to handle
+            // binconds, so just do false for now.
+            shouldCombine = -1;
+        }
+
+        @Override
+        public void visitCompositeEval(CompositeEvalSpec ce) {
+            // If we've already determined we're not combinable, stop.
+            if (shouldCombine < 0) return;
+
+            for (EvalSpec spec: ce.getSpecs()) {
+                spec.visit(this);
+            }
+        }
+
+        // ConstSpec is a NOP, as it neither will benefit from nor
+        // prevents combinability.
+        
+        @Override
+        public void visitFilter(FilterSpec f) {
+            shouldCombine = -1;
+        }
+
+        @Override
+        public void visitFuncEval(FuncEvalSpec fe) {
+            // Check the functions arguments, to make sure they are
+            // combinable.
+            FuncCombinable fc = new FuncCombinable();
+            fe.getArgs().visit(fc);
+            if (!fc.combinable) {
+                shouldCombine = -1;
+                return;
+            }
+            
+            if (fe.combinable()) shouldCombine = 1;
+            else shouldCombine = -1;
+        }
+
+        @Override
+        public void visitGenerate(GenerateSpec g) {
+            // If we've already determined we're not combinable, stop.
+            if (shouldCombine < 0) return;
+
+            for (EvalSpec spec: g.getSpecs()) {
+                spec.visit(this);
+            }
+        }
+
+        // MapLookupSpec is a NOP, as it neither will benefit from nor
+        // prevents combinability.
+        
+        // ProjectSpec is a NOP, as it neither will benefit from nor
+        // prevents combinability.
+        
+        @Override
+        public void visitSortDistinct(SortDistinctSpec sd) {
+            shouldCombine = -1;
+        }
+
+        // StarSpec is a NOP, as it neither will benefit from nor
+        // prevents combinability.
+    }
+
+}

Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
+import org.apache.pig.backend.local.executionengine.POVisitor;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.eval.StarSpec;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.PigLogger;
+
+public class POMapreduce extends PhysicalOperator {
+	private static final long serialVersionUID = 1L;
+	
+    public ArrayList<EvalSpec> toMap             = new ArrayList<EvalSpec>();
+    public EvalSpec     toCombine         = null;
+    public EvalSpec    toReduce          = null;
+    public ArrayList<EvalSpec>  groupFuncs           = null;
+    public SplitSpec        toSplit           = null;
+    public ArrayList<FileSpec>           inputFileSpecs         = new ArrayList<FileSpec>();
+    public FileSpec         outputFileSpec        = null;
+    public Class           partitionFunction = null;
+    public Class<WritableComparator> userComparator = null;
+    public String quantilesFile = null;
+    public PigContext pigContext;
+    
+    public OperatorKey sourceLogicalKey;
+    
+    public int                     mapParallelism       = -1;     // -1 means let hadoop decide
+    public int                     reduceParallelism    = -1;
+
+    
+    static MapReduceLauncher mapReduceLauncher = new MapReduceLauncher();
+
+    public String getScope() {
+        return scope;
+    }
+    
+    public boolean doesGrouping() {
+        return !(groupFuncs == null);
+    }
+    
+    public boolean doesSplitting() {
+    	return toSplit !=null;
+    }
+    
+    public boolean doesSorting() {
+    	return quantilesFile!=null;
+    }
+
+    public int numInputFiles() {
+        if (inputFileSpecs == null)
+            return 0;
+        else
+            return inputFileSpecs.size();
+    }
+
+    public POMapreduce(String scope, 
+                       long id, 
+                       Map<OperatorKey, ExecPhysicalOperator> opTable,
+                       OperatorKey sourceLogicalKey,
+                       PigContext pigContext, 
+                       int mapParallelism, 
+                       int reduceParallelism) {
+        this(scope, id, opTable, sourceLogicalKey, pigContext);
+        this.mapParallelism = mapParallelism;
+        this.reduceParallelism = reduceParallelism;
+    }
+
+    public POMapreduce(String scope, 
+                       long id, 
+                       Map<OperatorKey, ExecPhysicalOperator> opTable,
+                       OperatorKey sourceLogicalKey,
+                       PigContext pigContext, 
+                       OperatorKey[] inputsIn) {
+        super(scope, id, opTable, LogicalOperator.FIXED);
+        this.sourceLogicalKey = sourceLogicalKey;
+    	this.pigContext = pigContext;
+        inputs = inputsIn;
+    }
+
+    public POMapreduce(String scope, 
+                       long id,
+                       Map<OperatorKey, ExecPhysicalOperator> opTable,
+                       OperatorKey sourceLogicalKey,
+                       PigContext pigContext, 
+                       OperatorKey inputIn) {
+        super(scope, id, opTable, LogicalOperator.FIXED);
+        this.sourceLogicalKey = sourceLogicalKey;
+    	this.pigContext = pigContext;
+        inputs = new OperatorKey[1];
+        inputs[0] = inputIn;
+    }
+
+    public POMapreduce(String scope, 
+                       long id, 
+                       Map<OperatorKey, ExecPhysicalOperator> opTable,
+                       OperatorKey sourceLogicalKey,
+                       PigContext pigContext) {
+        super(scope, id, opTable, LogicalOperator.FIXED);
+        this.sourceLogicalKey = sourceLogicalKey;
+    	this.pigContext = pigContext;
+        inputs = new OperatorKey[0];
+    }
+
+    public void addInputOperator(OperatorKey newInput) {
+        OperatorKey[] oldInputs = inputs;
+        inputs = new OperatorKey[oldInputs.length + 1];
+        for (int i = 0; i < oldInputs.length; i++)
+            inputs[i] = oldInputs[i];
+        inputs[inputs.length - 1] = newInput;
+    }
+    
+    public void addInputOperators(OperatorKey[] newInputs) {
+        for (int i = 0; i < newInputs.length; i++) {
+            addInputOperator(newInputs[i]);
+        }
+    }
+    
+    public void addInputFile(FileSpec fileSpec){
+    	addInputFile(fileSpec, new StarSpec());
+    }
+    
+    public void addInputFile(FileSpec fileSpec, EvalSpec evalSpec){
+    	inputFileSpecs.add(fileSpec);
+    	toMap.add(evalSpec);
+    }
+    
+    
+    @Override
+	public boolean open() throws IOException {
+        // first, call open() on all inputs
+        if (inputs != null) {
+            for (int i = 0; i < inputs.length; i++) {
+                if (!((PhysicalOperator)opTable.get(inputs[i])).open())
+                    return false;
+            }
+        }
+
+        // then, have hadoop run this MapReduce job:
+        if (pigContext.debug) print();
+        boolean success = mapReduceLauncher.launchPig(this);
+        if (!success) {
+            // XXX: If we throw an exception on failure, why do we return a boolean ?!? - ben
+            throw new IOException("Job failed");
+        }
+        return success;
+    }
+
+    @Override
+	public Tuple getNext() throws IOException {
+        // drain all inputs
+        for (int i = 0; i < inputs.length; i++) {
+            while (((PhysicalOperator)opTable.get(inputs[i])).getNext() != null)
+                ;
+        }
+
+        // indicate that we are done
+        return null;
+    }
+    
+    public int numMRJobs() {
+        int numInputJobs = 0;
+        if (inputs != null) {
+            for (OperatorKey i : inputs) {
+                numInputJobs += ((POMapreduce) opTable.get(i)).numMRJobs();
+            }
+        }
+        return 1 + numInputJobs;
+    }
+
+    void print() {
+    	Logger log = PigLogger.getLogger();
+    	log.info("----- MapReduce Job -----");
+    	log.info("Input: " + inputFileSpecs);
+    	log.info("Map: " + toMap);
+		log.info("Group: " + groupFuncs);
+		log.info("Combine: " + toCombine);
+		log.info("Reduce: " + toReduce);
+		log.info("Output: " + outputFileSpec);
+		log.info("Split: " + toSplit);
+		log.info("Map parallelism: " + mapParallelism);
+		log.info("Reduce parallelism: " + reduceParallelism);
+    }
+    
+    public POMapreduce copy(long id){
+    	try{
+    		Map<OperatorKey, ExecPhysicalOperator> srcOpTable = this.opTable;
+    		this.opTable = null;
+    		
+    		POMapreduce copy = ((POMapreduce)ObjectSerializer.deserialize(ObjectSerializer.serialize(this)));
+    		
+    		copy.pigContext = pigContext;
+    		copy.inputs = inputs;
+    		copy.opTable = srcOpTable;
+    		copy.id = id;
+    		return copy;
+    	}catch(IOException e){
+    		throw new RuntimeException(e);
+    	}
+    }
+    
+    public EvalSpec getEvalSpec(int i){
+    	return toMap.get(i);
+    }
+    
+    public FileSpec getFileSpec(int i){
+    	return inputFileSpecs.get(i);
+    }
+    
+    public void addMapSpec(int i, EvalSpec spec){
+    	if (toMap.get(i) == null)
+    		toMap.set(i, spec);
+    	else
+    		toMap.set(i, toMap.get(i).addSpec(spec));
+    }
+    
+    public void addReduceSpec(EvalSpec spec){
+    	if (toReduce == null)
+    		toReduce = spec;
+    	else
+    		toReduce = toReduce.addSpec(spec);
+    }
+    
+    public void visit(POVisitor v, String prefix) {
+        PrintStream ps = v.getPrintStream();
+        
+        ps.println(prefix + "POMapReduce (" + this + "): scope id: " + this.scope);
+    }
+}
+
+

Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/SplitSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/SplitSpec.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/SplitSpec.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/SplitSpec.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.impl.FunctionInstantiator;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.eval.cond.Cond;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.io.FileLocalizer;
+
+
+public class SplitSpec implements Serializable{
+	private static final long serialVersionUID = 1L;
+	
+	public ArrayList<Cond> conditions;
+	public List<String> tempFiles;
+   	
+	private static String getTempFile(PigContext pigContext) throws IOException {
+	    return FileLocalizer.getTemporaryPath(null, pigContext).toString();
+	}
+	
+	public SplitSpec(LOSplit lo, PigContext pigContext){
+		this.conditions = lo.getConditions();
+		tempFiles = new ArrayList<String>();
+   	try{
+			for (int i=0; i<conditions.size(); i++){
+				tempFiles.add(getTempFile(pigContext));
+			}
+		}catch (IOException e){
+			e.printStackTrace();
+		}
+	}
+		
+	@Override
+	public String toString(){
+		StringBuilder sb = new StringBuilder();
+		for (int i=0; i<conditions.size(); i++){
+			if (i!=0) sb.append(";");
+			sb.append(conditions.get(i).toString());
+			sb.append(";");
+			sb.append(tempFiles.get(i));
+		}
+		return sb.toString();
+	}
+	public void instantiateFunc(FunctionInstantiator instantiaor)
+		throws IOException {
+		for(Cond condition : conditions)
+			condition.instantiateFunc(instantiaor);		
+	}
+}

Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java?rev=617338&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java Thu Jan 31 19:11:28 2008
@@ -0,0 +1,317 @@
+/*
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapreduceExec;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.io.File;
+import java.io.FileOutputStream;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.impl.util.PigLogger;
+import org.apache.pig.backend.hadoop.executionengine.POMapreduce;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.io.PigFile;
+import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
+import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.backend.datastorage.DataStorageException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.RunningJob; 
+
+
+/**
+ * This is that Main-Class for the Pig Jar file. It will setup a Pig jar file to run with the proper
+ * libraries. It will also provide a basic shell if - or -e is used as the name of the Jar file.
+ * 
+ * @author breed
+ * 
+ */
+public class MapReduceLauncher {
+    
+    public static long totalHadoopTimeSpent = 0;
+    public static int numMRJobs;
+    public static int mrJobNumber;
+    
+    public static Configuration config = null;
+    public static HExecutionEngine execEngine = null;
+
+    public static void setConf(Configuration configuration) {
+        config = configuration;
+    }
+    
+    public static void setExecEngine(HExecutionEngine executionEngine) {
+        execEngine = executionEngine;
+    }
+    
+    public static void initQueryStatus(int numMRJobsIn) {
+        numMRJobs = numMRJobsIn;
+        mrJobNumber = 0;
+    }
+
+    public static class PigWritableComparator extends WritableComparator {
+        public PigWritableComparator() {
+            super(Tuple.class);
+        }
+
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+	    }
+    }
+
+    static Random rand = new Random();
+
+    /**
+     * Submit a Pig job to hadoop.
+     * 
+     * @param mapFuncs
+     *            a list of map functions to apply to the inputs. The cardinality of the list should
+     *            be the same as input's cardinality.
+     * @param groupFuncs
+     *            a list of grouping functions to apply to the inputs. The cardinality of the list
+     *            should be the same as input's cardinality.
+     * @param reduceFunc
+     *            the reduce function.
+     * @param mapTasks
+     *            the number of map tasks to use.
+     * @param reduceTasks
+     *            the number of reduce tasks to use.
+     * @param input
+     *            a list of inputs
+     * @param output
+     *            the path of the output.
+     * @return an indicator of success or failure.
+     * @throws IOException
+     */
+    public boolean launchPig(POMapreduce pom) throws IOException {
+        Logger log = PigLogger.getLogger();
+        JobConf conf = new JobConf(config);
+        conf.setJobName(pom.pigContext.getJobName());
+        boolean success = false;
+        List<String> funcs = new ArrayList<String>();
+        
+        if (pom.toMap != null){
+            for (EvalSpec es: pom.toMap)
+                funcs.addAll(es.getFuncs());
+        }
+        if (pom.groupFuncs != null){
+            for(EvalSpec es: pom.groupFuncs)
+                funcs.addAll(es.getFuncs());
+        }
+        if (pom.toReduce != null) {
+            funcs.addAll(pom.toReduce.getFuncs());
+        }
+        
+        // create jobs.jar locally and pass it to hadoop
+        File submitJarFile = File.createTempFile("Job", ".jar");	
+        try {
+            FileOutputStream fos = new FileOutputStream(submitJarFile);
+            JarManager.createJar(fos, funcs, pom.pigContext);
+            log.debug("Job jar size = " + submitJarFile.length());
+            conf.setJar(submitJarFile.getPath());
+            String user = System.getProperty("user.name");
+            conf.setUser(user != null ? user : "Pigster");
+
+            if (pom.reduceParallelism != -1) {
+                conf.setNumReduceTasks(pom.reduceParallelism);
+            }
+            if (pom.toMap != null) {
+                conf.set("pig.mapFuncs", ObjectSerializer.serialize(pom.toMap));
+            }
+            if (pom.toCombine != null) {
+                conf.set("pig.combineFunc", ObjectSerializer.serialize(pom.toCombine));
+            }
+            if (pom.groupFuncs != null) {
+                conf.set("pig.groupFuncs", ObjectSerializer.serialize(pom.groupFuncs));
+            }
+            if (pom.toReduce != null) {
+                conf.set("pig.reduceFunc", ObjectSerializer.serialize(pom.toReduce));
+            }
+            if (pom.toSplit != null) {
+                conf.set("pig.splitSpec", ObjectSerializer.serialize(pom.toSplit));
+            }
+            if (pom.pigContext != null) {
+                conf.set("pig.pigContext", ObjectSerializer.serialize(pom.pigContext));
+            }
+            conf.setMapRunnerClass(PigMapReduce.class);
+            if (pom.toCombine != null) {
+                conf.setCombinerClass(PigCombine.class);
+                //conf.setCombinerClass(PigMapReduce.class);
+            }
+            if (pom.quantilesFile!=null){
+                conf.set("pig.quantilesFile", pom.quantilesFile);
+            }
+            else{
+                // this is not a sort job - can use byte comparison to speed up processing
+                conf.setOutputKeyComparatorClass(PigWritableComparator.class);					
+            }
+            if (pom.partitionFunction!=null){
+                conf.setPartitionerClass(SortPartitioner.class);
+            }
+            conf.setReducerClass(PigMapReduce.class);
+            conf.setInputFormat(PigInputFormat.class);
+            conf.setOutputFormat(PigOutputFormat.class);
+            // not used starting with 0.15 conf.setInputKeyClass(Text.class);
+            // not used starting with 0.15 conf.setInputValueClass(Tuple.class);
+            conf.setOutputKeyClass(Tuple.class);
+            if (pom.userComparator != null) {
+                conf.setOutputKeyComparatorClass(pom.userComparator);
+            }
+            conf.setOutputValueClass(IndexedTuple.class);
+            conf.set("pig.inputs", ObjectSerializer.serialize(pom.inputFileSpecs));
+            
+            conf.setOutputPath(new Path(pom.outputFileSpec.getFileName()));
+            conf.set("pig.storeFunc", pom.outputFileSpec.getFuncSpec());
+
+            //
+            // Now, actually submit the job (using the submit name)
+            //
+            JobClient jobClient = execEngine.getJobClient();
+	    	RunningJob status = jobClient.submitJob(conf);
+	    	log.debug("submitted job: " + status.getJobID());
+            
+	    	long sleepTime = 1000;
+	    	double lastQueryProgress = -1.0;
+	    	int lastJobsQueued = -1;
+	    	double lastMapProgress = -1.0;
+	    	double lastReduceProgress = -1.0;
+	    	while (true) {
+	    	    try {
+	    	        Thread.sleep(sleepTime); } catch (Exception e) {}
+	    	        
+	    	        if (status.isComplete()) {
+	    	            success = status.isSuccessful();
+	    	            log.debug("Job finished " + (success ? "" : "un") + "successfully");
+	    	            if (success) {
+	    	                mrJobNumber++;
+	    	            }
+	    	            double queryProgress = ((double) mrJobNumber) / ((double) numMRJobs);
+	    	            if (queryProgress > lastQueryProgress) {
+	    	                log.info("Pig progress = " + ((int) (queryProgress * 100)) + "%");
+	    	                lastQueryProgress = queryProgress;
+	    	            }
+	    	            break;
+	    	        }
+	    	        else // still running
+	    	        {
+	    	            double mapProgress = status.mapProgress();
+	    	            double reduceProgress = status.reduceProgress();
+	    	            if (lastMapProgress != mapProgress || lastReduceProgress != reduceProgress) {
+	    	                log.debug("Hadoop job progress: Map=" + (int) (mapProgress * 100) + "% Reduce="
+	    	                        + (int) (reduceProgress * 100) + "%");
+	    	                lastMapProgress = mapProgress;
+	    	                lastReduceProgress = reduceProgress;
+	    	            }
+	    	            double numJobsCompleted = mrJobNumber;
+	    	            double thisJobProgress = (mapProgress + reduceProgress) / 2.0;
+	    	            double queryProgress = (numJobsCompleted + thisJobProgress) / ((double) numMRJobs);
+	    	            if (queryProgress > lastQueryProgress) {
+	    	                log.info("Pig progress = " + ((int) (queryProgress * 100)) + "%");
+	    	                lastQueryProgress = queryProgress;
+	    	            }
+	    	        }
+	    	}
+
+	    	// bug 1030028: if the input file is empty; hadoop doesn't create the output file!
+	    	Path outputFile = conf.getOutputPath();
+	    	String outputName = outputFile.getName();
+	    	int colon = outputName.indexOf(':');
+	    	if (colon != -1) {
+	    	    outputFile = new Path(outputFile.getParent(), outputName.substring(0, colon));
+	    	}
+            	
+	    	try {
+	    	    ElementDescriptor descriptor = 
+	    	        ((HDataStorage)(pom.pigContext.getDfs())).asElement(outputFile.toString());
+
+	    	    if (success && !descriptor.exists()) {
+                        
+	    	        // create an empty output file
+	    	        PigFile f = new PigFile(outputFile.toString(), false);
+	    	        f.store(BagFactory.getInstance().newDefaultBag(), 
+	    	        	    new PigStorage(), 
+	    	        	    pom.pigContext);
+	    	    }
+	    	}
+	    	catch (DataStorageException e) {
+	    	    throw new IOException("Failed to obtain descriptor for " + outputFile.toString(), e);
+	    	}
+
+	    	if (!success) {
+	    	    // go find the error messages
+	    	    getErrorMessages(jobClient.getMapTaskReports(status.getJobID()),
+	    	            "map", log);
+	    	    getErrorMessages(jobClient.getReduceTaskReports(status.getJobID()),
+	    	            "reduce", log);
+	    	}
+	    	else {
+	    	    long timeSpent = 0;
+              
+	    	    // NOTE: this call is crashing due to a bug in Hadoop; the bug is known and the patch has not been applied yet.
+	    	    TaskReport[] mapReports = jobClient.getMapTaskReports(status.getJobID());
+	    	    TaskReport[] reduceReports = jobClient.getReduceTaskReports(status.getJobID());
+	    	    for (TaskReport r : mapReports) {
+	    	        timeSpent += (r.getFinishTime() - r.getStartTime());
+	    	    }
+	    	    for (TaskReport r : reduceReports) {
+	    	        timeSpent += (r.getFinishTime() - r.getStartTime());
+	    	    }
+	    	    totalHadoopTimeSpent += timeSpent;
+	    	}
+        }
+        catch (Exception e) {
+            // Do we need different handling for different exceptions
+            e.printStackTrace();
+            throw new IOException(e.getMessage());
+        }
+        finally {
+            submitJarFile.delete();
+        }
+        return success;
+    }
+
+private void getErrorMessages(TaskReport reports[], String type, Logger log)
+{
+	for (int i = 0; i < reports.length; i++) {
+		String msgs[] = reports[i].getDiagnostics();
+		StringBuilder sb = new StringBuilder("Error message from task (" + type + ") " +
+			reports[i].getTaskId());
+		for (int j = 0; j < msgs.length; j++) {
+			sb.append(" " + msgs[j]);
+		}
+		log.error(sb.toString());
+	}
+}
+
+}