You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/16 03:09:32 UTC

svn commit: r656913 [1/3] - in /incubator/pig/branches/types: ./ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/ src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/mapReduceLayer...

Author: gates
Date: Thu May 15 18:09:30 2008
New Revision: 656913

URL: http://svn.apache.org/viewvc?rev=656913&view=rev
Log:
PIG-162 Shubham's addition MR launcher code.


Added:
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinder.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinderForExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
Removed:
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SortPartitioner.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserComparisonFunc.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserFunc.java
Modified:
    incubator/pig/branches/types/build.xml
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
    incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java
    incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PlanPrinter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC1.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC10.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC11.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC12.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC13.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC14.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC2.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC3.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC4.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC5.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC6.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC7.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC8.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC9.gld
    incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java

Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Thu May 15 18:09:30 2008
@@ -146,7 +146,8 @@
         		**/test/TestPODistinct.java, **/test/TestPOSort.java,
         		**/test/TestSchema.java, **/test/TestLogicalPlanBuilder.java,**/test/TestUnion.java, **/test/TestMRCompiler.java,
                 **/test/FakeFSInputStream.java, **/test/Util.java, **/test/TestJobSubmission.java,
-        		**/test/TestLocalJobSubmission.java, **/test/TestPOMapLookUp.java,
+        		**/test/TestLocalJobSubmission.java, **/test/TestPOMapLookUp.java, 
+        		**/test/TestPOBinCond.java, **/test/TestPONegative.java, **/pig/impl/builtin/GFCross.java,
                 **/logicalLayer/*.java, **/logicalLayer/parser/NodeIdGenerator.java,
                 **/logicalLayer/schema/*.java, **/physicalLayer/topLevelOperators/*.java,
                 **/physicalLayer/topLevelOperators/**/*.java, **/physicalLayer/plans/*.java,
@@ -277,7 +278,8 @@
                 	<include name="**/TestLogicalPlanBuilder.java" />
                 	<include name="**/TestLocalJobSubmission.java" />
                 	<include name="**/TestPOMapLookUp.java" />
-                	
+                	<include name="**/TestPOBinCond.java" />
+                	<include name="**/TestPONegative.java" />
                     <!--
                     <include name="**/*Test*.java" />
                     <exclude name="**/TestLargeFile.java" />

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java Thu May 15 18:09:30 2008
@@ -57,11 +57,11 @@
 import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.SortPartitioner;
 import org.apache.pig.backend.hadoop.datastorage.HFile;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
+import org.apache.pig.impl.mapReduceLayer.SortPartitioner;
 
 // compiler for mapreduce physical plans
 public class MapreducePlanCompiler {

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java Thu May 15 18:09:30 2008
@@ -1,4 +1,5 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -46,6 +47,7 @@
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.eval.EvalSpec;
 import org.apache.pig.impl.io.PigFile;
+import org.apache.pig.impl.mapReduceLayer.SortPartitioner;
 import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.WrappedIOException;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java Thu May 15 18:09:30 2008
@@ -21,6 +21,7 @@
 import java.util.Iterator;
 
 import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
@@ -36,8 +37,16 @@
     
     @Override
     public DataBag exec(Tuple input) throws IOException {
-        Integer numQuantiles = (Integer)input.get(0);
-        DataBag samples = (DataBag)input.get(1);
+        Integer numQuantiles = null;
+        DataBag samples = null;
+        try{
+            numQuantiles = (Integer)input.get(0);
+            samples = (DataBag)input.get(1);
+        }catch(ExecException e){
+            IOException ioe = new IOException();
+            ioe.initCause(e);
+            throw ioe;
+        }
         DataBag output = mBagFactory.newDefaultBag();
         
         long numSamples = samples.size();

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java Thu May 15 18:09:30 2008
@@ -21,6 +21,7 @@
 import java.util.Random;
 
 import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
@@ -35,33 +36,40 @@
     public static int DEFAULT_PARALLELISM = 96;
 
     @Override
-    public DataBag exec(Tuple input) throws IOException {;
-        numInputs = (Integer)input.get(0);
-        myNumber = (Integer)input.get(1);
-        DataBag output = mBagFactory.newDefaultBag();
+    public DataBag exec(Tuple input) throws IOException {
+        try{
+            numInputs = (Integer)input.get(0);
+            myNumber = (Integer)input.get(1);
         
-        numGroupsPerInput = (int) Math.ceil(Math.pow(DEFAULT_PARALLELISM, 1.0/numInputs));
-        int numGroupsGoingTo = (int) Math.pow(numGroupsPerInput,numInputs - 1);
+            DataBag output = mBagFactory.newDefaultBag();
             
-        int[] digits = new int[numInputs];
-        for (int i=0; i<numInputs; i++){
-            if (i == myNumber){
-                Random r = new Random(System.currentTimeMillis());
-                digits[i] = r.nextInt(numGroupsPerInput);
-            }else{
-                digits[i] = 0;
+            numGroupsPerInput = (int) Math.ceil(Math.pow(DEFAULT_PARALLELISM, 1.0/numInputs));
+            int numGroupsGoingTo = (int) Math.pow(numGroupsPerInput,numInputs - 1);
+                
+            int[] digits = new int[numInputs];
+            for (int i=0; i<numInputs; i++){
+                if (i == myNumber){
+                    Random r = new Random(System.currentTimeMillis());
+                    digits[i] = r.nextInt(numGroupsPerInput);
+                }else{
+                    digits[i] = 0;
+                }
             }
+                
+            for (int i=0; i<numGroupsGoingTo; i++){
+                output.add(toTuple(digits));
+                next(digits);
+            }            
+    
+            return output;
+        }catch(ExecException e){
+            IOException ioe = new IOException();
+            ioe.initCause(e);
+            throw ioe;
         }
-            
-        for (int i=0; i<numGroupsGoingTo; i++){
-            output.add(toTuple(digits));
-            next(digits);
-        }            
-
-        return output;
     }
     
-    private Tuple toTuple(int[] digits) throws IOException{
+    private Tuple toTuple(int[] digits) throws IOException, ExecException{
         Tuple t = mTupleFactory.newTuple(numInputs);
         for (int i=0; i<numInputs; i++){
             t.set(i, digits[i]);

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java Thu May 15 18:09:30 2008
@@ -242,12 +242,18 @@
                 mro.reducePlan.remove(pack);
                 jobConf.setMapperClass(PigMapReduce.Map.class);
                 jobConf.setReducerClass(PigMapReduce.Reduce.class);
+                jobConf.setNumReduceTasks((mro.requestedParallelism>0)?mro.requestedParallelism:1);
                 jobConf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
                 jobConf.set("pig.reducePlan", ObjectSerializer.serialize(mro.reducePlan));
                 jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
                 jobConf.setOutputKeyClass(DataType.getWritableComparableTypes(pack.getKeyType()).getClass());
                 jobConf.setOutputValueClass(IndexedTuple.class);
             }
+            
+            if(mro.isGlobalSort()){
+                jobConf.set("pig.quantilesFile", mro.getQuantFile());
+                jobConf.setPartitionerClass(SortPartitioner.class);
+            }
     
             return jobConf;
         }catch(Exception e){

Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java Thu May 15 18:09:30 2008
@@ -0,0 +1,162 @@
+package org.apache.pig.impl.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class Launcher {
+    private static final Log log = LogFactory.getLog(Launcher.class);
+    
+    protected Launcher(){
+        
+    }
+    /**
+     * Method to launch pig for hadoop either for a cluster's
+     * job tracker or for a local job runner. THe only difference
+     * between the two is the job client. Depending on the pig context
+     * the job client will be initialize to one of the two.
+     * Launchers for other frameworks can overide these methods.
+     * Given an input PhysicalPlan, it compiles it
+     * to get a MapReduce Plan. The MapReduce plan which
+     * has multiple MapReduce operators each one of which
+     * has to be run as a map reduce job with dependency
+     * information stored in the plan. It compiles the
+     * MROperPlan into a JobControl object. Each Map Reduce
+     * operator is converted into a Job and added to the JobControl
+     * object. Each Job also has a set of dependent Jobs that
+     * are created using the MROperPlan.
+     * The JobControl object is obtained from the JobControlCompiler
+     * Then a new thread is spawned that submits these jobs
+     * while respecting the dependency information.
+     * The parent thread monitors the submitted jobs' progress and
+     * after it is complete, stops the JobControl thread.
+     * @param php
+     * @param grpName
+     * @param pc
+     * @throws PlanException
+     * @throws VisitorException
+     * @throws IOException
+     * @throws ExecException
+     * @throws JobCreationException
+     */
+    protected void launchPig(PhysicalPlan<PhysicalOperator> php, String grpName, PigContext pc)
+            throws PlanException, VisitorException, IOException, ExecException,
+            JobCreationException {
+        long sleepTime = 500;
+        MRCompiler comp = new MRCompiler(php, pc);
+        comp.compile();
+        
+        ExecutionEngine exe = pc.getExecutionEngine();
+        Configuration conf = ((HConfiguration)exe.getConfiguration()).getConfiguration();
+        JobClient jobClient = ((HExecutionEngine)exe).getJobClient();
+
+        MROperPlan mrp = comp.getMRPlan();
+        JobControlCompiler jcc = new JobControlCompiler();
+        
+        JobControl jc = jcc.compile(mrp, grpName, conf, pc);
+        
+        new Thread(jc).start();
+
+        int numMRJobs = jc.getWaitingJobs().size();
+        double lastProg = -1;
+        while(!jc.allFinished()){
+            try {
+                Thread.sleep(sleepTime);
+            } catch (InterruptedException e) {}
+            double prog = calculateProgress(jc, jobClient)/numMRJobs;
+            if(prog>lastProg)
+                log.info(prog * 100 + "% complete");
+            lastProg = prog;
+        }
+        lastProg = calculateProgress(jc, jobClient)/numMRJobs;
+        if(lastProg==1.0)
+            log.info("Completed Successfully");
+        else{
+            log.info("Unsuccessful attempt. Completed " + lastProg * 100 + "% of the job");
+            List<Job> failedJobs = jc.getFailedJobs();
+            for (Job job : failedJobs) {
+                String MRJobID = job.getMapredJobID();
+                getErrorMessages(jobClient.getMapTaskReports(MRJobID), "map");
+                getErrorMessages(jobClient.getReduceTaskReports(MRJobID), "reduce");
+            }
+        }
+
+        jc.stop(); 
+        
+    }
+    
+    protected static void getErrorMessages(TaskReport reports[], String type)
+    {
+        for (int i = 0; i < reports.length; i++) {
+            String msgs[] = reports[i].getDiagnostics();
+            StringBuilder sb = new StringBuilder("Error message from task (" + type + ") " +
+                reports[i].getTaskId());
+            for (int j = 0; j < msgs.length; j++) {
+                sb.append(" " + msgs[j]);
+            }
+            log.error(sb.toString());
+        }
+    }
+    
+    /**
+     * Compute the progress of the current job submitted 
+     * through the JobControl object jc to the JobClient jobClient
+     * @param jc - The JobControl object that has been submitted
+     * @param jobClient - The JobClient to which it has been submitted
+     * @return The progress as a precentage in double format
+     * @throws IOException
+     */
+    protected static double calculateProgress(JobControl jc, JobClient jobClient) throws IOException{
+        double prog = 0.0;
+        prog += jc.getSuccessfulJobs().size();
+        
+        List runnJobs = jc.getRunningJobs();
+        for (Object object : runnJobs) {
+            Job j = (Job)object;
+            prog += progressOfRunningJob(j, jobClient);
+        }
+        return prog;
+    }
+    
+    /**
+     * Returns the progress of a Job j which is part of a submitted
+     * JobControl object. The progress is for this Job. So it has to
+     * be scaled down by the num of jobs that are present in the 
+     * JobControl.
+     * @param j - The Job for which progress is required
+     * @param jobClient - the JobClient to which it has been submitted
+     * @return Returns the percentage progress of this Job
+     * @throws IOException
+     */
+    protected static double progressOfRunningJob(Job j, JobClient jobClient) throws IOException{
+        String mrJobID = j.getMapredJobID();
+        RunningJob rj = jobClient.getJob(mrJobID);
+        if(rj==null && j.getState()==Job.SUCCESS)
+            return 1;
+        else if(rj==null)
+            return 0;
+        else{
+            double mapProg = rj.mapProgress();
+            double redProg = rj.reduceProgress();
+            return (mapProg + redProg)/2;
+        }
+    }
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java Thu May 15 18:09:30 2008
@@ -0,0 +1,18 @@
+package org.apache.pig.impl.mapReduceLayer;
+
+import java.io.IOException;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+
+public class LocalLauncher extends Launcher{
+    @Override
+    public void launchPig(PhysicalPlan<PhysicalOperator> php, String grpName, PigContext pc) throws PlanException, VisitorException, IOException, ExecException, JobCreationException {
+        super.launchPig(php, grpName, pc);
+    }
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java Thu May 15 18:09:30 2008
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -30,25 +31,39 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.FindQuantiles;
+import org.apache.pig.impl.builtin.RandomSampleLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.OperatorKey;
 import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
 import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.mapReduceLayer.plans.UDFFinder;
+import org.apache.pig.impl.mapReduceLayer.plans.UDFFinderForExpr;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
 import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PORead;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POSort;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POSplit;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserFunc;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.PlanException;
@@ -73,13 +88,19 @@
  * being to keep the number of MROpers to a minimum.
  * 
  * It also merges multiple Map jobs, created by compiling
- * the inputs individually, into a single job.
+ * the inputs individually, into a single job. Here a new
+ * map job is created and then the contents of the previous
+ * map plans are added. However, any other state that was in
+ * the previous map plans, should be manually moved over. So,
+ * if you are adding something new take care about this.
+ * Ex of this is in requestedParallelism
  * 
  * Only in case of blocking operators and splits, a new 
  * MapReduce operator is started using a store-load combination
  * to connect the two operators. Whenever this happens
  * care is taken to add the MROper into the MRPlan and connect it
  * appropriately.
+ * 
  *
  */
 public class MRCompiler extends PhyPlanVisitor<PhysicalOperator, PhysicalPlan<PhysicalOperator>> {
@@ -117,6 +138,10 @@
     
     private Random r;
     
+    private UDFFinderForExpr udfFinderForExpr;
+    
+    private UDFFinder udfFinder;
+    
     public MRCompiler(PhysicalPlan<PhysicalOperator> plan) {
         this(plan,null);
     }
@@ -132,6 +157,8 @@
         scope = "MRCompiler";
         r = new Random(1331);
         FileLocalizer.setR(r);
+        udfFinderForExpr = new UDFFinderForExpr();
+        udfFinder = new UDFFinder();
     }
     
     /**
@@ -218,6 +245,8 @@
         //Now we have the inputs compiled. Do something
         //with the input oper op.
         op.visit(this);
+        if(op.getRequestedParallelism() > curMROp.requestedParallelism)
+            curMROp.requestedParallelism = op.getRequestedParallelism();
         compiledInputs = prevCompInp;
     }
     
@@ -378,21 +407,9 @@
         }
     }
     
-    /**
-     * Used to compile a split operator. The logic is to
-     * close the split job by replacing the split oper by
-     * a store and creating a new Map MRoper and return
-     * that as the current MROper to which other operators
-     * would be compiled into. The new MROper would be connected
-     * to the split job by load-store. Also add the split oper 
-     * to the splitsSeen map.
-     * @param op
-     * @throws IOException
-     * @throws PlanException 
-     */
-    private void split(POSplit op) throws PlanException{
+    
+    private MapReduceOper endSingleInputPlanWithStr(FileSpec fSpec) throws PlanException{
         MapReduceOper mro = compiledInputs[0];
-        FileSpec fSpec = op.getSplitStore();
         POStore str = getStore();
         str.setSFile(fSpec);
         if (!mro.isMapDone()) {
@@ -404,8 +421,7 @@
         } else {
             log.warn("Both map and reduce phases have been done. This is unexpected while compiling!");
         }
-        splitsSeen.put(op.getOperatorKey(), mro);
-        curMROp = startNew(fSpec, mro);
+        return mro;
     }
     
     /**
@@ -486,11 +502,15 @@
             }
         }
         merge(ret.get(0).mapPlan, mpLst);
+        
         Iterator<MapReduceOper> it = toBeConnected.iterator();
         while(it.hasNext())
             MRPlan.connect(it.next(), mergedMap);
-        for(MapReduceOper rmro : remLst)
+        for(MapReduceOper rmro : remLst){
+            if(rmro.requestedParallelism > mergedMap.requestedParallelism)
+                mergedMap.requestedParallelism = rmro.requestedParallelism;
             MRPlan.remove(rmro);
+        }
         return ret;
     }
     
@@ -508,14 +528,43 @@
             finPlan.merge(e);
         }
     }
+
+    private void addUDFs(ExprPlan plan) throws VisitorException{
+        if(plan!=null){
+            udfFinderForExpr.setPlan(plan);
+            udfFinderForExpr.visit();
+            curMROp.UDFs.addAll(udfFinderForExpr.getUDFs());
+        }
+    }
+    
+    private void addUDFs(PhysicalPlan<PhysicalOperator> plan) throws VisitorException{
+        if(plan!=null){
+            udfFinder.setPlan(plan);
+            udfFinder.visit();
+            curMROp.UDFs.addAll(udfFinder.getUDFs());
+        }
+    }
+    
+    
+    /* The visitOp methods that decide what to do with the current operator */
     
     /**
-     * The visitOp methods that decide what to do with the current operator
+     * Compiles a split operator. The logic is to
+     * close the split job by replacing the split oper by
+     * a store and creating a new Map MRoper and return
+     * that as the current MROper to which other operators
+     * would be compiled into. The new MROper would be connected
+     * to the split job by load-store. Also add the split oper 
+     * to the splitsSeen map.
+     * @param op - The split operator
+     * @throws VisitorException
      */
-    
     public void visitSplit(POSplit op) throws VisitorException{
         try{
-            split(op);
+            FileSpec fSpec = op.getSplitStore();
+            MapReduceOper mro = endSingleInputPlanWithStr(fSpec);
+            splitsSeen.put(op.getOperatorKey(), mro);
+            curMROp = startNew(fSpec, mro);
         }catch(Exception e){
             VisitorException pe = new VisitorException(e.getMessage());
             pe.initCause(e);
@@ -546,6 +595,7 @@
     public void visitFilter(POFilter op) throws VisitorException{
         try{
             nonBlocking(op);
+            addUDFs(op.getPlan());
         }catch(Exception e){
             VisitorException pe = new VisitorException(e.getMessage());
             pe.initCause(e);
@@ -556,6 +606,10 @@
     public void visitLocalRearrange(POLocalRearrange op) throws VisitorException {
         try{
             nonBlocking(op);
+            List<ExprPlan> plans = op.getPlans();
+            if(plans!=null)
+                for(ExprPlan ep : plans)
+                    addUDFs(ep);
         }catch(Exception e){
             VisitorException pe = new VisitorException(e.getMessage());
             pe.initCause(e);
@@ -566,6 +620,7 @@
     public void visitForEach(POForEach op) throws VisitorException{
         try{
             nonBlocking(op);
+            addUDFs(op.getPlan());
         }catch(Exception e){
             VisitorException pe = new VisitorException(e.getMessage());
             pe.initCause(e);
@@ -602,4 +657,273 @@
             throw pe;
         }
     }
+
+    @Override
+    public void visitSort(POSort op) throws VisitorException {
+        try{
+            FileSpec fSpec = getTempFileSpec();
+            MapReduceOper mro = endSingleInputPlanWithStr(fSpec);
+            FileSpec quantFile = getTempFileSpec();
+            int rp = op.getRequestedParallelism();
+            int[] fields = getSortCols(op);
+            MapReduceOper quant = getQuantileJob(op, mro, fSpec, quantFile, rp, fields);
+            curMROp = getSortJob(quant, fSpec, quantFile, rp, fields);
+        }catch(Exception e){
+            VisitorException pe = new VisitorException(e.getMessage());
+            pe.initCause(e);
+            throw pe;
+        }
+    }
+    
+    private int[] getSortCols(POSort sort){
+        List<ExprPlan> plans = sort.getSortPlans();
+        if(plans!=null){
+            int[] ret = new int[plans.size()]; 
+            int i=-1;
+            for (ExprPlan plan : plans) {
+                ret[++i] = ((POProject)plan.getLeaves().get(0)).getColumn();
+            }
+            return ret;
+        }
+        return null;
+    }
+    
+    public MapReduceOper getSortJob(MapReduceOper quantJob, FileSpec lFile, FileSpec quantFile, int rp, int[] fields) throws PlanException{
+        MapReduceOper mro = startNew(lFile, quantJob);
+        mro.setQuantFile(quantFile.getFileName());
+        mro.setGlobalSort(true);
+        mro.requestedParallelism = rp;
+        
+        List<ExprPlan> eps1 = new ArrayList<ExprPlan>();
+        
+        if(fields==null)
+            throw new PlanException("No Expression Plan found in POSort");
+        for (int i : fields) {
+            ExprPlan ep = new ExprPlan();
+            POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+            prj.setColumn(i);
+            prj.setOverloaded(false);
+            prj.setResultType(DataType.BYTEARRAY);
+            ep.add(prj);
+            eps1.add(ep);
+        }
+        
+        POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        lr.setIndex(0);
+        lr.setKeyType((fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
+        lr.setPlans(eps1);
+        lr.setResultType(DataType.TUPLE);
+        mro.mapPlan.addAsLeaf(lr);
+        
+        mro.setMapDone(true);
+        
+        POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        pkg.setKeyType((fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
+        pkg.setNumInps(1);
+        boolean[] inner = {false}; 
+        pkg.setInner(inner);
+        mro.reducePlan.add(pkg);
+        
+        ExprPlan ep = new ExprPlan();
+        POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        prj.setColumn(1);
+        prj.setOverloaded(false);
+        prj.setResultType(DataType.BYTEARRAY);
+        ep.add(prj);
+        List<ExprPlan> eps2 = new ArrayList<ExprPlan>();
+        eps2.add(ep);
+        List<Boolean> flattened = new ArrayList<Boolean>();
+        flattened.add(true);
+        POGenerate fe1Gen = new POGenerate(new OperatorKey(scope,nig.getNextNodeId(scope)),eps2,flattened);
+        fe1Gen.setResultType(DataType.TUPLE);
+        PhysicalPlan<PhysicalOperator> fe1Plan = new PhysicalPlan<PhysicalOperator>();
+        fe1Plan.add(fe1Gen);
+        POForEach fe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        fe1.setPlan(fe1Plan);
+        fe1.setResultType(DataType.TUPLE);
+        mro.reducePlan.add(fe1);
+        mro.reducePlan.connect(pkg, fe1);
+//        ep1.add(innGen);
+        return mro;
+    }
+
+    public MapReduceOper getQuantileJob(POSort sort, MapReduceOper prevJob, FileSpec lFile, FileSpec quantFile, int rp, int[] fields) throws PlanException, VisitorException {
+        FileSpec quantLdFilName = new FileSpec(lFile.getFileName(),RandomSampleLoader.class.getName());
+        MapReduceOper mro = startNew(quantLdFilName, prevJob);
+        mro.UDFs.add(FindQuantiles.class.getName());
+        if(sort.isUDFComparatorUsed)
+            mro.UDFs.add(sort.getMSortFunc().getFuncSpec());
+        
+        List<ExprPlan> eps1 = new ArrayList<ExprPlan>();
+        List<Boolean> flat1 = new ArrayList<Boolean>();
+        
+        if(fields==null)
+            throw new PlanException("No Expression Plan found in POSort");
+        for (int i : fields) {
+            ExprPlan ep = new ExprPlan();
+            POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+            prj.setColumn(i);
+            prj.setOverloaded(false);
+            prj.setResultType(DataType.BYTEARRAY);
+            ep.add(prj);
+            eps1.add(ep);
+            flat1.add(true);
+        }
+        POGenerate fe1Gen = new POGenerate(new OperatorKey(scope,nig.getNextNodeId(scope)),eps1,flat1);
+        fe1Gen.setResultType(DataType.TUPLE);
+        PhysicalPlan<PhysicalOperator> fe1Plan = new PhysicalPlan<PhysicalOperator>();
+        fe1Plan.add(fe1Gen);
+        POForEach fe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        fe1.setPlan(fe1Plan);
+        fe1.setResultType(DataType.TUPLE);
+        mro.mapPlan.addAsLeaf(fe1);
+        
+        ExprPlan ep1 = new ExprPlan();
+        ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        ce.setValue("all");
+        ep1.add(ce);
+        
+        List<ExprPlan> eps = new ArrayList<ExprPlan>();
+        eps.add(ep1);
+        
+        POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        lr.setIndex(0);
+        lr.setKeyType(DataType.CHARARRAY);
+        lr.setPlans(eps);
+        lr.setResultType(DataType.TUPLE);
+        mro.mapPlan.add(lr);
+        mro.mapPlan.connect(fe1, lr);
+        
+        mro.setMapDone(true);
+        
+        POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        pkg.setKeyType(DataType.CHARARRAY);
+        pkg.setNumInps(1);
+        boolean[] inner = {false}; 
+        pkg.setInner(inner);
+        mro.reducePlan.add(pkg);
+        
+        PhysicalPlan<PhysicalOperator> fe2Plan = new PhysicalPlan<PhysicalOperator>();
+        
+        POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        topPrj.setColumn(1);
+        topPrj.setOverloaded(true);
+        topPrj.setResultType(DataType.TUPLE);
+        fe2Plan.add(topPrj);
+        
+        ExprPlan nesSortPlan = new ExprPlan();
+        POProject prjStar2 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        prjStar2.setResultType(DataType.TUPLE);
+        prjStar2.setStar(true);
+        nesSortPlan.add(prjStar2);
+        
+        List<ExprPlan> nesSortPlanLst = new ArrayList<ExprPlan>();
+        nesSortPlanLst.add(nesSortPlan);
+        
+        sort.setSortPlans(nesSortPlanLst);
+        fe2Plan.add(sort);
+        fe2Plan.connect(topPrj, sort);
+        
+        ExprPlan ep3 = new ExprPlan();
+        POProject prjStar3 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        prjStar3.setResultType(DataType.TUPLE);
+        prjStar3.setStar(true);
+        ep3.add(prjStar3);
+        
+        ExprPlan rpep = new ExprPlan();
+        ConstantExpression rpce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        rpce.setRequestedParallelism(rp);
+        rpce.setValue(rp<=0?1:rp);
+        rpep.add(rpce);
+        
+        List<ExprPlan> genEps = new ArrayList<ExprPlan>();
+        genEps.add(rpep);
+        genEps.add(ep3);
+        
+        List<Boolean> flattened2 = new ArrayList<Boolean>();
+        flattened2.add(false);
+        flattened2.add(false);
+        POGenerate nesGen = new POGenerate(new OperatorKey(scope,nig.getNextNodeId(scope)), genEps, flattened2);
+        fe2Plan.add(nesGen);
+        fe2Plan.connect(sort, nesGen);
+        
+        POForEach fe2 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        fe2.setPlan(fe2Plan);
+        fe2.setResultType(DataType.TUPLE);
+        
+        mro.reducePlan.add(fe2);
+        mro.reducePlan.connect(pkg, fe2);
+        
+        ExprPlan ep4 = new ExprPlan();
+        POProject prjStar4 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        prjStar4.setResultType(DataType.TUPLE);
+        prjStar4.setStar(true);
+        ep4.add(prjStar4);
+        
+        List ufInps = new ArrayList();
+        ufInps.add(prjStar4);
+        POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps, FindQuantiles.class.getName());
+        ep4.add(uf);
+        ep4.connect(prjStar4, uf);
+        
+        List<ExprPlan> ep4s = new ArrayList<ExprPlan>();
+        ep4s.add(ep4);
+        List<Boolean> flattened3 = new ArrayList<Boolean>();
+        flattened3.add(false);
+        POGenerate finGen = new POGenerate(new OperatorKey(scope,nig.getNextNodeId(scope)), ep4s, flattened3);
+        
+        PhysicalPlan<PhysicalOperator> fe3Plan = new PhysicalPlan<PhysicalOperator>();
+        fe3Plan.add(finGen);
+        
+        POForEach fe3 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        fe3.setPlan(fe3Plan);
+        fe3.setResultType(DataType.TUPLE);
+        
+        mro.reducePlan.add(fe3);
+        mro.reducePlan.connect(fe2, fe3);
+        
+        POStore str = getStore();
+        str.setSFile(quantFile);
+        mro.reducePlan.add(str);
+        mro.reducePlan.connect(fe3, str);
+        
+        mro.setReduceDone(true);
+//        mro.requestedParallelism = rp;
+        return mro;
+    }
+    
+    public static void main(String[] args) throws PlanException, IOException, ExecException, VisitorException {
+        PigContext pc = new PigContext();
+        pc.connect();
+        MRCompiler comp = new MRCompiler(null, pc);
+        Random r = new Random();
+        List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+        POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+        pr1.setResultType(DataType.INTEGER);
+        ExprPlan expPlan = new ExprPlan();
+        expPlan.add(pr1);
+        sortPlans.add(expPlan);
+        List<Boolean> mAscCols = new LinkedList<Boolean>();
+        mAscCols.add(false);
+        MapReduceOper pj = comp.getMROp();
+        POLoad ld = comp.getLoad();
+        pj.mapPlan.add(ld);
+
+        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, null,
+                sortPlans, mAscCols, null);
+        
+        pj.mapPlan.addAsLeaf(sort);
+        
+        POStore st = comp.getStore();
+        pj.mapPlan.addAsLeaf(st);
+        
+        MRCompiler c1 = new MRCompiler(pj.mapPlan,pc);
+        c1.compile();
+        MROperPlan plan = c1.getMRPlan();
+        for(int i=0;i<3;i++){
+            MapReduceOper job = plan.getLeaves().get(0);
+            System.out.println(job.name());
+            plan.remove(job);
+        }
+    }
 }

Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java Thu May 15 18:09:30 2008
@@ -0,0 +1,22 @@
+package org.apache.pig.impl.mapReduceLayer;
+
+import java.io.IOException;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * Main class that launches pig for Map Reduce
+ *
+ */
+public class MapReduceLauncher extends Launcher{
+
+    @Override
+    public void launchPig(PhysicalPlan<PhysicalOperator> php, String grpName, PigContext pc) throws PlanException, VisitorException, IOException, ExecException, JobCreationException {
+        super.launchPig(php, grpName, pc);
+    }
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java Thu May 15 18:09:30 2008
@@ -65,11 +65,16 @@
     //Indicates if this job is an order by job
     boolean globalSort = false;
     
+    //The quantiles file name if globalSort is true
+    String quantFile;
+    
     public List<String> UDFs;
     
     NodeIdGenerator nig;
 
     private String scope;
+    
+    int requestedParallelism = -1;
 
     public MapReduceOper(OperatorKey k) {
         super(k);
@@ -104,7 +109,10 @@
      */
     @Override
     public String name() {
-        StringBuilder sb = new StringBuilder("MapReduce - " + mKey.toString()
+        String udfStr = getUDFsAsStr();
+        
+        StringBuilder sb = new StringBuilder("MapReduce" + "(" + requestedParallelism + 
+                (udfStr.equals("")? "" : ",") + udfStr + ")" + " - " + mKey.toString()
                 + ":\n");
         int index = sb.length();
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -126,6 +134,18 @@
         return sb.toString();
     }
 
+    private String getUDFsAsStr() {
+        StringBuilder sb = new StringBuilder();
+        if(UDFs!=null && UDFs.size()>0){
+            for (String str : UDFs) {
+                sb.append(str.substring(str.lastIndexOf('.')+1));
+                sb.append(',');
+            }
+            sb.deleteCharAt(sb.length()-1);
+        }
+        return sb.toString();
+    }
+
     @Override
     public boolean supportsMultipleInputs() {
         return true;
@@ -145,7 +165,7 @@
         return mapDone;
     }
     
-    public void setMapDone(boolean mapDone) throws IOException{
+    public void setMapDone(boolean mapDone){
         this.mapDone = mapDone;
     }
     
@@ -214,4 +234,12 @@
     public void setGlobalSort(boolean globalSort) {
         this.globalSort = globalSort;
     }
+
+    public String getQuantFile() {
+        return quantFile;
+    }
+
+    public void setQuantFile(String quantFile) {
+        this.quantFile = quantFile;
+    }
 }

Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java Thu May 15 18:09:30 2008
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.mapReduceLayer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+
+
+public class SortPartitioner implements Partitioner {
+    Tuple[] quantiles;
+    WritableComparator comparator;
+    
+    public int getPartition(WritableComparable key, Writable value,
+            int numPartitions) {
+        try{
+            Tuple keyTuple = (Tuple)key;
+            int index = Arrays.binarySearch(quantiles, (Tuple)keyTuple.get(0), comparator);
+            if (index < 0)
+                index = -index-1;
+            return Math.min(index, numPartitions - 1);
+        }catch(ExecException e){
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void configure(JobConf job) {
+        String quantilesFile = job.get("pig.quantilesFile", "");
+        if (quantilesFile.length() == 0)
+            throw new RuntimeException("Sort paritioner used but no quantiles found");
+        
+        try{
+            InputStream is = FileLocalizer.openDFSFile(quantilesFile,job);
+            BinStorage loader = new BinStorage();
+            loader.bindTo(quantilesFile, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
+            
+            Tuple t;
+            ArrayList<Tuple> quantiles = new ArrayList<Tuple>();
+            
+            while(true){
+                t = loader.getNext();
+                if (t==null)
+                    break;
+                quantiles.add(t);
+            }
+            this.quantiles = quantiles.toArray(new Tuple[0]);
+        }catch (IOException e){
+            throw new RuntimeException(e);
+        }
+
+        comparator = job.getOutputKeyComparator();
+    }
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinder.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinder.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinder.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinder.java Thu May 15 18:09:30 2008
@@ -0,0 +1,72 @@
+package org.apache.pig.impl.mapReduceLayer.plans;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POSort;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class UDFFinder extends PhyPlanVisitor {
+    List<String> UDFs;
+    DepthFirstWalker<PhysicalOperator, PhysicalPlan<PhysicalOperator>> dfw;
+    UDFFinderForExpr udfFinderForExpr;
+    
+    public UDFFinder(){
+        this(null, null);
+    }
+    
+    public UDFFinder(ExprPlan plan,
+            PlanWalker<ExpressionOperator, ExprPlan> walker) {
+        super(plan, walker);
+        UDFs = new ArrayList<String>();
+        dfw = new DepthFirstWalker<PhysicalOperator, PhysicalPlan<PhysicalOperator>>(null);
+        udfFinderForExpr = new UDFFinderForExpr();
+    }
+
+    public List<String> getUDFs() {
+        return UDFs;
+    }
+    
+    public void setPlan(PhysicalPlan<PhysicalOperator> plan){
+        mPlan = plan;
+        dfw.setPlan(plan);
+        mCurrentWalker = dfw;
+        UDFs.clear();
+    }
+    
+    private void addUDFsIn(ExprPlan ep) throws VisitorException{
+        udfFinderForExpr.setPlan(ep);
+        udfFinderForExpr.visit();
+        UDFs.addAll(udfFinderForExpr.getUDFs());
+    }
+
+    @Override
+    public void visitFilter(POFilter op) throws VisitorException {
+        addUDFsIn(op.getPlan());
+    }
+
+    @Override
+    public void visitGenerate(POGenerate op) throws VisitorException {
+        List<ExprPlan> eps = op.getInputPlans();
+        for (ExprPlan ep : eps) {
+            addUDFsIn(ep);
+        }
+    }
+
+    @Override
+    public void visitSort(POSort op) throws VisitorException {
+        if(op.getMSortFunc()!=null)
+            UDFs.add(op.getMSortFunc().getFuncSpec());
+    }
+    
+    
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinderForExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinderForExpr.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinderForExpr.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinderForExpr.java Thu May 15 18:09:30 2008
@@ -0,0 +1,44 @@
+package org.apache.pig.impl.mapReduceLayer.plans;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserFunc;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class UDFFinderForExpr extends ExprPlanVisitor {
+    List<String> UDFs;
+    DepthFirstWalker<ExpressionOperator, ExprPlan> dfw;
+    
+    public UDFFinderForExpr(){
+        this(null, null);
+    }
+    
+    public UDFFinderForExpr(ExprPlan plan,
+            PlanWalker<ExpressionOperator, ExprPlan> walker) {
+        super(plan, walker);
+        UDFs = new ArrayList<String>();
+        dfw = new DepthFirstWalker<ExpressionOperator, ExprPlan>(null);
+    }
+
+    @Override
+    public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
+        UDFs.add(userFunc.getFuncSpec());
+    }
+
+    public List<String> getUDFs() {
+        return UDFs;
+    }
+    
+    public void setPlan(ExprPlan plan){
+        mPlan = plan;
+        dfw.setPlan(plan);
+        mCurrentWalker = dfw;
+        UDFs.clear();
+    }
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java Thu May 15 18:09:30 2008
@@ -25,6 +25,7 @@
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POCast;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POMapLookUp;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserFunc;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.EqualToExpr;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GTOrEqualToExpr;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.arithmeticOperators.Add;
@@ -109,7 +110,7 @@
     public void visitMod(Mod mod) throws VisitorException {
         //do nothing
     }
-
+    
     public void visitBinCond(POBinCond binCond) {
         // do nothing
         
@@ -119,6 +120,10 @@
         //do nothing
         
     }
+    
+    public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
+        //do nothing
+    }
 
 	public void visitMapLookUp(POMapLookUp mapLookUp) {
 		// TODO Auto-generated method stub

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java Thu May 15 18:09:30 2008
@@ -32,10 +32,10 @@
 import org.apache.pig.impl.physicalLayer.topLevelOperators.PODistinct;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
-import org.apache.pig.impl.physicalLayer.topLevelOperators.POUserFunc;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserFunc;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.PlanWalker;
@@ -77,11 +77,11 @@
     }
     
     public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
-        pushWalker(mCurrentWalker.spawnChildWalker((P)lr.getPlan()));
-        // this causes the current walker (the new one we created)
-        // to walk the nested plan
-        visit();
-        popWalker();
+        List<ExprPlan> inpPlans = lr.getPlans();
+        for (ExprPlan plan : inpPlans) {
+            ExprPlanVisitor epv = new ExprPlanVisitor(plan,new DependencyOrderWalker<ExpressionOperator, ExprPlan>(plan));
+            epv.visit();
+        }
     }
     
     public void visitForEach(POForEach fe) throws VisitorException{
@@ -116,24 +116,19 @@
         //do nothing
     }
 
-    public void visitDistinct(PODistinct distinct) throws VisitorException {
-        //do nothing        
-    }
-
-    public void visitRead(PORead read) throws VisitorException {
-        //do nothing        
-    }
+	public void visitDistinct(PODistinct distinct) throws VisitorException {
+        //do nothing		
+	}
+
+	public void visitRead(PORead read) throws VisitorException {
+        //do nothing		
+	}
 
-    public void visitSort(POSort sort) throws VisitorException {
+	public void visitSort(POSort sort) throws VisitorException {
         List<ExprPlan> inpPlans = sort.getSortPlans();
         for (ExprPlan plan : inpPlans) {
             ExprPlanVisitor epv = new ExprPlanVisitor(plan,new DependencyOrderWalker<ExpressionOperator, ExprPlan>(plan));
             epv.visit();
         }
-    }
-
-    public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
-        //do nothing
-    }
-
+	}
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PlanPrinter.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PlanPrinter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PlanPrinter.java Thu May 15 18:09:30 2008
@@ -150,7 +150,7 @@
             
         }
         else if(node instanceof POLocalRearrange){
-            sb.append(planString(((POLocalRearrange)node).getPlan()));
+            sb.append(planString(((POLocalRearrange)node).getPlans()));
         }
         else if(node instanceof POSort){
             sb.append(planString(((POSort)node).getSortPlans())); 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java Thu May 15 18:09:30 2008
@@ -146,10 +146,10 @@
             
             res = gen.getNext(t);
             
-            if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
+            /*if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
                 return inp;
             if(inp.returnStatus == POStatus.STATUS_NULL)
-                continue;
+                continue;*/
             
             processingPlan = true;
             

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java Thu May 15 18:09:30 2008
@@ -89,7 +89,22 @@
 
     @Override
     public String name() {
-        return "POGenerate - " + mKey.toString();
+        String fString = getFlatStr();
+        return "POGenerate" + "(" + fString + ")" + "  - " + mKey.toString();
+    }
+
+    private String getFlatStr() {
+        if(isToBeFlattened==null)
+            return "";
+        StringBuilder sb = new StringBuilder();
+        for (Boolean b : isToBeFlattened) {
+            sb.append(b);
+            sb.append(',');
+        }
+        if(sb.length()>0){
+            sb.deleteCharAt(sb.length()-1);
+        }
+        return sb.toString();
     }
 
     /**

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java Thu May 15 18:09:30 2008
@@ -17,19 +17,24 @@
  */
 package org.apache.pig.impl.physicalLayer.topLevelOperators;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.DefaultTuple;
 import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
 import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -47,17 +52,12 @@
 
     private Log log = LogFactory.getLog(getClass());
 
-    PhysicalPlan<PhysicalOperator> plan;
+    List<ExprPlan> plans;
+    
+    List<ExpressionOperator> leafOps;
 
     // The position of this LR in the package operator
     int index;
-
-    POGenerate gen;
-    
-    //Since the plan has a generate, this needs to be maintained
-    //as the generate can potentially return multiple tuples for
-    //same call.
-    private boolean processingPlan = false;
     
     byte keyType;
 
@@ -76,6 +76,7 @@
     public POLocalRearrange(OperatorKey k, int rp, List<PhysicalOperator> inp) {
         super(k, rp, inp);
         index = -1;
+        leafOps = new ArrayList<ExpressionOperator>();
     }
 
     @Override
@@ -113,43 +114,18 @@
     @Override
     public void attachInput(Tuple t) {
         super.attachInput(t);
-        processingPlan = false;
     }
     
     /**
      * Calls getNext on the generate operator inside the nested
      * physical plan. Converts the generated tuple into the proper
-     * format, i.e, (key,{(value)})
+     * format, i.e, (key,indexedTuple(value))
      */
     @Override
     public Result getNext(Tuple t) throws ExecException {
-        Result res = null;
+        
         Result inp = null;
-        //The nested plan is under processing
-        //So return tuples that the generate oper
-        //returns after converting them to the required
-        //format
-        if(processingPlan){
-            while(true) {
-                res = gen.getNext(t);
-                if(res.returnStatus==POStatus.STATUS_OK){
-                    res.result = constructLROutput((Tuple)res.result);
-                    return res;
-                }
-                if(res.returnStatus==POStatus.STATUS_ERR)
-                    return res;
-                if(res.returnStatus==POStatus.STATUS_NULL)
-                    continue;
-                if(res.returnStatus==POStatus.STATUS_EOP){
-                    processingPlan = false;
-                    break;
-                }
-            }
-        }
-        //The nested plan processing is done or is
-        //yet to begin. So process the input and start
-        //nested plan processing on the input tuple
-        //read
+        Result res = null;
         while (true) {
             inp = processInput();
             if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
@@ -157,48 +133,80 @@
             if (inp.returnStatus == POStatus.STATUS_NULL)
                 continue;
             
-            plan.attachInput((Tuple) inp.result);
-            
-            res = gen.getNext(t);
-            if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
-                break;
-            if(inp.returnStatus == POStatus.STATUS_NULL)
-                continue;
-            
-            processingPlan = true;
-            
-            res.result = constructLROutput((Tuple)res.result);
+            for (ExprPlan ep : plans) {
+                ep.attachInput((Tuple)inp.result);
+            }
+            List<Result> resLst = new ArrayList<Result>();
+            for (ExpressionOperator op : leafOps){
+                
+                switch(op.resultType){
+                case DataType.BAG:
+                    res = op.getNext(dummyBag);
+                    break;
+                case DataType.BOOLEAN:
+                    res = op.getNext(dummyBool);
+                    break;
+                case DataType.BYTEARRAY:
+                    res = op.getNext(dummyDBA);
+                    break;
+                case DataType.CHARARRAY:
+                    res = op.getNext(dummyString);
+                    break;
+                case DataType.DOUBLE:
+                    res = op.getNext(dummyDouble);
+                    break;
+                case DataType.FLOAT:
+                    res = op.getNext(dummyFloat);
+                    break;
+                case DataType.INTEGER:
+                    res = op.getNext(dummyInt);
+                    break;
+                case DataType.LONG:
+                    res = op.getNext(dummyLong);
+                    break;
+                case DataType.MAP:
+                    res = op.getNext(dummyMap);
+                    break;
+                case DataType.TUPLE:
+                    res = op.getNext(dummyTuple);
+                    break;
+                }
+                if(res.returnStatus!=POStatus.STATUS_OK)
+                    return new Result();
+                resLst.add(res);
+            }
+            res.result = constructLROutput(resLst,(Tuple)inp.result);
             return res;
         }
         return inp;
     }
     
-    private Tuple constructLROutput(Tuple genOut){
-        //Strip the input tuple off its key which
-        //will be the first field in the tuple
-        Object key = genOut.getAll().remove(0);
+    private Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{
+        //Construct key
+        Object key;
+        if(resLst.size()>1){
+            Tuple t = TupleFactory.getInstance().newTuple(resLst.size());
+            int i=-1;
+            for(Result res : resLst)
+                t.set(++i, res.result);
+            key = t;
+        }
+        else{
+            key = resLst.get(0).result;
+        }
         
         //Create the indexed tuple out of the value
         //that is remaining in the input tuple
-        IndexedTuple it = new IndexedTuple(genOut, index);
+        IndexedTuple it = new IndexedTuple(value, index);
         
         //Put the key and the indexed tuple
         //in a tuple and return
-        Tuple outPut = new DefaultTuple();
-        outPut.append(key);
-        outPut.append(it);
+        Tuple outPut = TupleFactory.getInstance().newTuple(2);
+        outPut.set(0,key);
+        outPut.set(1,it);
         return outPut;
     }
 
-    public PhysicalPlan<PhysicalOperator> getPlan() {
-        return plan;
-    }
-
-    public void setPlan(PhysicalPlan<PhysicalOperator> plan) {
-        this.plan = plan;
-        gen = (POGenerate) plan.getLeaves().get(0);
-    }
-
     public byte getKeyType() {
         return keyType;
     }
@@ -206,4 +214,16 @@
     public void setKeyType(byte keyType) {
         this.keyType = keyType;
     }
+
+    public List<ExprPlan> getPlans() {
+        return plans;
+    }
+
+    public void setPlans(List<ExprPlan> plans) {
+        this.plans = plans;
+        leafOps.clear();
+        for (ExprPlan plan : plans) {
+            leafOps.add(plan.getLeaves().get(0));
+        }
+    }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java Thu May 15 18:09:30 2008
@@ -35,6 +35,8 @@
 import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
 import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserFunc;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -51,94 +53,94 @@
  */
 public class POSort extends PhysicalOperator<PhyPlanVisitor> {
 
-    //private List<Integer> mSortCols;
-    private List<ExprPlan> sortPlans;
-    private List<Byte> ExprOutputTypes;
-    private List<Boolean> mAscCols;
-    private POUserComparisonFunc mSortFunc;
-    private final Log log = LogFactory.getLog(getClass());
-
-    private boolean inputsAccumulated = false;
-    public boolean isUDFComparatorUsed = false;
-    private DataBag sortedBag;
-    transient Iterator<Tuple> it;
-
-    public POSort(OperatorKey k, int rp, List inp, List<ExprPlan> sortPlans,
-            List<Boolean> mAscCols, POUserFunc mSortFunc) {
-        super(k, rp, inp);
-        //this.mSortCols = mSortCols;
-        this.sortPlans = sortPlans;
-        this.mAscCols = mAscCols;
-        this.mSortFunc = (POUserComparisonFunc) mSortFunc;
-        if (mSortFunc == null) {
-            sortedBag = BagFactory.getInstance().newSortedBag(
-                    new SortComparator());
-            ExprOutputTypes = new ArrayList<Byte>(sortPlans.size());
-
-            for(ExprPlan plan : sortPlans) {
-                ExprOutputTypes.add(plan.getLeaves().get(0).resultType);
-            }
-        } else {
-            sortedBag = BagFactory.getInstance().newSortedBag(
-                    new UDFSortComparator());
-            isUDFComparatorUsed = true;
-        }
-    }
-
-    public POSort(OperatorKey k, int rp, List inp) {
-        super(k, rp, inp);
-
-    }
-
-    public POSort(OperatorKey k, int rp) {
-        super(k, rp);
-
-    }
-
-    public POSort(OperatorKey k, List inp) {
-        super(k, inp);
-
-    }
-
-    public POSort(OperatorKey k) {
-        super(k);
-
-    }
-    
-    public class SortComparator implements Comparator<Tuple> {
-        public int compare(Tuple o1, Tuple o2) {
-            int count = 0;
-            int ret = 0;
-            if(sortPlans == null || sortPlans.size() == 0) 
-                return 0;
-            for(ExprPlan plan : sortPlans) {
-                try {
-                    plan.attachInput(o1);
-                    Result res1 = getResult(plan, ExprOutputTypes.get(count));
-                    plan.attachInput(o2);
-                    Result res2 = getResult(plan, ExprOutputTypes.get(count));
-                    if(res1.returnStatus != POStatus.STATUS_OK || res2.returnStatus != POStatus.STATUS_OK) {
-                        log.error("Error processing the input in the expression plan : " + plan.toString());
-                    } else {
-                        if(mAscCols.get(count ++))
-                            ret = DataType.compare(res1.result, res2.result);
-                        else
-                            ret = DataType.compare(res2.result, res1.result);
-                    }
-                        
-                } catch (ExecException e) {
-                    log.error("Invalid result while executing the expression plan : " + plan.toString() + "\n" + e.getMessage());
-                }
-                
-            }
-            return ret;
-        } 
-        
-        private Result getResult(ExprPlan plan, byte resultType) throws ExecException {
-            ExpressionOperator Op = plan.getLeaves().get(0);
-            Result res = null;
-            
-            switch (resultType) {
+	//private List<Integer> mSortCols;
+	private List<ExprPlan> sortPlans;
+	private List<Byte> ExprOutputTypes;
+	private List<Boolean> mAscCols;
+	private POUserComparisonFunc mSortFunc;
+	private final Log log = LogFactory.getLog(getClass());
+
+	private boolean inputsAccumulated = false;
+	public boolean isUDFComparatorUsed = false;
+	private DataBag sortedBag;
+	transient Iterator<Tuple> it;
+
+	public POSort(OperatorKey k, int rp, List inp, List<ExprPlan> sortPlans,
+			List<Boolean> mAscCols, POUserFunc mSortFunc) {
+		super(k, rp, inp);
+		//this.mSortCols = mSortCols;
+		this.sortPlans = sortPlans;
+		this.mAscCols = mAscCols;
+		this.mSortFunc = (POUserComparisonFunc) mSortFunc;
+		if (mSortFunc == null) {
+			sortedBag = BagFactory.getInstance().newSortedBag(
+					new SortComparator());
+			ExprOutputTypes = new ArrayList<Byte>(sortPlans.size());
+
+			for(ExprPlan plan : sortPlans) {
+				ExprOutputTypes.add(plan.getLeaves().get(0).resultType);
+			}
+		} else {
+			sortedBag = BagFactory.getInstance().newSortedBag(
+					new UDFSortComparator());
+			isUDFComparatorUsed = true;
+		}
+	}
+
+	public POSort(OperatorKey k, int rp, List inp) {
+		super(k, rp, inp);
+
+	}
+
+	public POSort(OperatorKey k, int rp) {
+		super(k, rp);
+
+	}
+
+	public POSort(OperatorKey k, List inp) {
+		super(k, inp);
+
+	}
+
+	public POSort(OperatorKey k) {
+		super(k);
+
+	}
+	
+	public class SortComparator implements Comparator<Tuple> {
+		public int compare(Tuple o1, Tuple o2) {
+			int count = 0;
+			int ret = 0;
+			if(sortPlans == null || sortPlans.size() == 0) 
+				return 0;
+			for(ExprPlan plan : sortPlans) {
+				try {
+					plan.attachInput(o1);
+					Result res1 = getResult(plan, ExprOutputTypes.get(count));
+					plan.attachInput(o2);
+					Result res2 = getResult(plan, ExprOutputTypes.get(count));
+					if(res1.returnStatus != POStatus.STATUS_OK || res2.returnStatus != POStatus.STATUS_OK) {
+						log.error("Error processing the input in the expression plan : " + plan.toString());
+					} else {
+						if(mAscCols.get(count ++))
+							ret = DataType.compare(res1.result, res2.result);
+						else
+							ret = DataType.compare(res2.result, res1.result);
+					}
+						
+				} catch (ExecException e) {
+					log.error("Invalid result while executing the expression plan : " + plan.toString() + "\n" + e.getMessage());
+				}
+				
+			}
+			return ret;
+		} 
+		
+		private Result getResult(ExprPlan plan, byte resultType) throws ExecException {
+			ExpressionOperator Op = plan.getLeaves().get(0);
+			Result res = null;
+			
+			switch (resultType) {
             case DataType.BYTEARRAY:
                 res = Op.getNext(dummyDBA);
                 break;
@@ -158,95 +160,107 @@
                 res = Op.getNext(dummyLong);
                 break;
             }
-            return res;
-        }
-    }
-
-    public class UDFSortComparator implements Comparator<Tuple> {
-
-        public int compare(Tuple t1, Tuple t2) {
-
-            mSortFunc.attachInput(t1, t2);
-            Integer i = null;
-            Result res = null;
-            try {
-                res = mSortFunc.getNext(i);
-            } catch (ExecException e) {
-
-                log.error("Input not ready. Error on reading from input. "
-                        + e.getMessage());
-            }
-            if (res != null)
-                return (Integer) res.result;
-            else
-                return 0;
-        }
-
-    }
-
-    @Override
-    public String name() {
+			return res;
+		}
+	}
+
+	public class UDFSortComparator implements Comparator<Tuple> {
+
+		public int compare(Tuple t1, Tuple t2) {
+
+			mSortFunc.attachInput(t1, t2);
+			Integer i = null;
+			Result res = null;
+			try {
+				res = mSortFunc.getNext(i);
+			} catch (ExecException e) {
+
+				log.error("Input not ready. Error on reading from input. "
+						+ e.getMessage());
+			}
+			if (res != null)
+				return (Integer) res.result;
+			else
+				return 0;
+		}
+
+	}
+
+	@Override
+	public String name() {
+
+		return "POSort - " + mKey.toString();
+	}
+
+	@Override
+	public boolean isBlocking() {
+
+		return true;
+	}
+
+	@Override
+	public Result getNext(Tuple t) throws ExecException {
+		Result res = new Result();
+		if (!inputsAccumulated) {
+			res = processInput();
+			while (res.returnStatus != POStatus.STATUS_EOP) {
+				if (res.returnStatus == POStatus.STATUS_ERR) {
+					log.error("Error in reading from the inputs");
+					continue;
+				} else if (res.returnStatus == POStatus.STATUS_NULL) {
+					continue;
+				}
+				sortedBag.add((Tuple) res.result);
+				res = processInput();
+
+			}
+
+			inputsAccumulated = true;
+
+		}
+		if (it == null) {
+			it = sortedBag.iterator();
+		}
+		res.result = it.next();
+		if (res.result == null)
+			res.returnStatus = POStatus.STATUS_EOP;
+		else
+			res.returnStatus = POStatus.STATUS_OK;
+		return res;
+	}
+
+	@Override
+	public boolean supportsMultipleInputs() {
+
+		return false;
+	}
+
+	@Override
+	public boolean supportsMultipleOutputs() {
 
-        return "POSort - " + mKey.toString();
-    }
+		return false;
+	}
 
-    @Override
-    public boolean isBlocking() {
-
-        return true;
-    }
-
-    @Override
-    public Result getNext(Tuple t) throws ExecException {
-        Result res = new Result();
-        if (!inputsAccumulated) {
-            res = processInput();
-            while (res.returnStatus != POStatus.STATUS_EOP) {
-                if (res.returnStatus == POStatus.STATUS_ERR) {
-                    log.error("Error in reading from the inputs");
-                    continue;
-                } else if (res.returnStatus == POStatus.STATUS_NULL) {
-                    continue;
-                }
-                sortedBag.add((Tuple) res.result);
-                res = processInput();
-
-            }
+	@Override
+	public void visit(PhyPlanVisitor v) throws VisitorException {
 
-            inputsAccumulated = true;
+		v.visitSort(this);
+	}
 
-        }
-        if (it == null) {
-            it = sortedBag.iterator();
-        }
-        res.result = it.next();
-        if (res.result == null)
-            res.returnStatus = POStatus.STATUS_EOP;
-        else
-            res.returnStatus = POStatus.STATUS_OK;
-        return res;
-    }
-
-    @Override
-    public boolean supportsMultipleInputs() {
-
-        return false;
+    public List<ExprPlan> getSortPlans() {
+        return sortPlans;
     }
 
-    @Override
-    public boolean supportsMultipleOutputs() {
-
-        return false;
+    public void setSortPlans(List<ExprPlan> sortPlans) {
+        this.sortPlans = sortPlans;
     }
 
-    @Override
-    public void visit(PhyPlanVisitor v) throws VisitorException {
-
-        v.visitSort(this);
+    public POUserComparisonFunc getMSortFunc() {
+        return mSortFunc;
     }
 
-    public List<ExprPlan> getSortPlans() {
-        return sortPlans;
+    public void setMSortFunc(POUserComparisonFunc sortFunc) {
+        mSortFunc = sortFunc;
     }
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java Thu May 15 18:09:30 2008
@@ -64,6 +64,8 @@
     //of the translator to set this.
     boolean overloaded = false;
     
+    boolean star = false;
+    
     public POProject(OperatorKey k) {
         this(k,-1,0);
     }
@@ -79,7 +81,8 @@
 
     @Override
     public String name() {
-        return "Project(" + column + ") - " + mKey.toString();
+        
+        return "Project(" + ((star) ? "*" : column) + ") - " + mKey.toString();
     }
 
     @Override
@@ -184,6 +187,9 @@
             res = processInput();
             if(res.returnStatus!=POStatus.STATUS_OK)
                 return res;
+            if(star)
+                return res;
+            
             inpValue = (Tuple)res.result;
             res.result = null;
             
@@ -229,4 +235,12 @@
         this.overloaded = overloaded;
     }
 
+    public boolean isStar() {
+        return star;
+    }
+
+    public void setStar(boolean star) {
+        this.star = star;
+    }
+
 }

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java Thu May 15 18:09:30 2008
@@ -0,0 +1,124 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.ComparisonFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+
+public class POUserComparisonFunc extends POUserFunc {
+
+	transient ComparisonFunc func;
+	private Log log = LogFactory.getLog(getClass());
+	
+	public POUserComparisonFunc(OperatorKey k, int rp, List inp, String funcSpec, ComparisonFunc func) {
+		super(k, rp, inp);
+		this.funcSpec = funcSpec;
+		this.func = func;		
+	}
+	
+	public POUserComparisonFunc(OperatorKey k, int rp, List inp, String funcSpec) {
+		this(k, rp, inp, funcSpec, null);
+		
+		instantiateFunc();
+	}
+	
+	private void instantiateFunc() {
+		this.func = (ComparisonFunc) PigContext.instantiateFuncFromSpec(this.funcSpec);
+	}
+	
+	public ComparisonFunc getComparator() {
+		if (func == null)
+			instantiateFunc();
+		return func;
+	}
+	
+	@Override
+	public Result getNext(Integer i) throws ExecException {
+		Result result = new Result();
+
+		if (func == null)
+			instantiateFunc();
+
+
+		result.result = func.compare(t1, t2);
+		result.returnStatus = (t1 != null && t2 != null) ? POStatus.STATUS_OK
+				: POStatus.STATUS_ERR;
+		// the two attached tuples are used up now. So we set the
+		// inputAttached flag to false
+		inputAttached = false;
+		return result;
+
+	}
+	
+	private Result getNext() {
+		Result res = null;
+		log.error("getNext being called with non-integer");
+		return res;
+	}
+	
+	@Override
+	public Result getNext(Boolean b) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(DataBag db) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(DataByteArray ba) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Double d) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Float f) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Long l) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Map m) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(String s) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Tuple in) throws ExecException {
+		return getNext();
+	}
+
+	public void attachInput(Tuple t1, Tuple t2) {
+		if (func == null)
+			instantiateFunc();
+
+		this.t1 = t1;
+		this.t2 = t2;
+		inputAttached = true;
+
+	}
+
+}