You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/06/19 21:56:03 UTC

svn commit: r669666 [2/3] - in /incubator/pig/branches/types: src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/local/executionengine/ src/org/apache/pig/builtin/ src/org/apache/pig/data/ src/org/apache/pig/impl/logicalLayer...

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java Thu Jun 19 12:56:00 2008
@@ -22,10 +22,10 @@
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 
-public class Launcher {
+public abstract class Launcher {
     private static final Log log = LogFactory.getLog(Launcher.class);
     
-    int totalHadoopTimeSpent;
+    long totalHadoopTimeSpent;
     
     protected Launcher(){
         totalHadoopTimeSpent = 0;
@@ -59,63 +59,15 @@
      * @throws ExecException
      * @throws JobCreationException
      */
-    protected boolean launchPig(PhysicalPlan<PhysicalOperator> php, String grpName, PigContext pc)
+    public abstract boolean launchPig(PhysicalPlan<PhysicalOperator> php, String grpName, PigContext pc)
             throws PlanException, VisitorException, IOException, ExecException,
-            JobCreationException {
-        long sleepTime = 500;
-        MRCompiler comp = new MRCompiler(php, pc);
-        comp.compile();
-        
-        ExecutionEngine exe = pc.getExecutionEngine();
-        Configuration conf = ((HConfiguration)exe.getConfiguration()).getConfiguration();
-        JobClient jobClient = ((HExecutionEngine)exe).getJobClient();
-
-        MROperPlan mrp = comp.getMRPlan();
-        JobControlCompiler jcc = new JobControlCompiler();
-        
-        JobControl jc = jcc.compile(mrp, grpName, conf, pc);
-        
-        new Thread(jc).start();
-
-        int numMRJobs = jc.getWaitingJobs().size();
-        double lastProg = -1;
-        while(!jc.allFinished()){
-            try {
-                Thread.sleep(sleepTime);
-            } catch (InterruptedException e) {}
-            double prog = calculateProgress(jc, jobClient)/numMRJobs;
-            if(prog>lastProg)
-                log.info(prog * 100 + "% complete");
-            lastProg = prog;
-        }
-        lastProg = calculateProgress(jc, jobClient)/numMRJobs;
-        if(isComplete(lastProg))
-            log.info("Completed Successfully");
-        else{
-            log.info("Unsuccessful attempt. Completed " + lastProg * 100 + "% of the job");
-            List<Job> failedJobs = jc.getFailedJobs();
-            if(failedJobs==null)
-                throw new ExecException("Something terribly wrong with Job Control.");
-            for (Job job : failedJobs) {
-                getStats(job,jobClient);
-            }
-        }
-        List<Job> succJobs = jc.getSuccessfulJobs();
-        if(succJobs!=null)
-            for(Job job : succJobs){
-                getStats(job,jobClient);
-            }
-
-        jc.stop(); 
-        
-        return isComplete(lastProg);
-    }
+            JobCreationException;
     
-    private boolean isComplete(double prog){
+    protected boolean isComplete(double prog){
         return (int)(Math.ceil(prog)) == (int)1;
     }
     
-    private void getStats(Job job, JobClient jobClient) throws IOException{
+    protected void getStats(Job job, JobClient jobClient) throws IOException{
         String MRJobID = job.getMapredJobID();
         TaskReport[] mapRep = jobClient.getMapTaskReports(MRJobID);
         getErrorMessages(mapRep, "map");
@@ -125,15 +77,15 @@
         totalHadoopTimeSpent += computeTimeSpent(mapRep);
     }
     
-    private int computeTimeSpent(TaskReport[] mapReports) {
-        int timeSpent = 0;
+    protected long computeTimeSpent(TaskReport[] mapReports) {
+        long timeSpent = 0;
         for (TaskReport r : mapReports) {
             timeSpent += (r.getFinishTime() - r.getStartTime());
         }
         return timeSpent;
     }
     
-    protected static void getErrorMessages(TaskReport reports[], String type)
+    protected void getErrorMessages(TaskReport reports[], String type)
     {
         for (int i = 0; i < reports.length; i++) {
             String msgs[] = reports[i].getDiagnostics();
@@ -154,7 +106,7 @@
      * @return The progress as a precentage in double format
      * @throws IOException
      */
-    protected static double calculateProgress(JobControl jc, JobClient jobClient) throws IOException{
+    protected double calculateProgress(JobControl jc, JobClient jobClient) throws IOException{
         double prog = 0.0;
         prog += jc.getSuccessfulJobs().size();
         
@@ -176,7 +128,7 @@
      * @return Returns the percentage progress of this Job
      * @throws IOException
      */
-    protected static double progressOfRunningJob(Job j, JobClient jobClient) throws IOException{
+    protected double progressOfRunningJob(Job j, JobClient jobClient) throws IOException{
         String mrJobID = j.getMapredJobID();
         RunningJob rj = jobClient.getJob(mrJobID);
         if(rj==null && j.getState()==Job.SUCCESS)
@@ -189,7 +141,7 @@
             return (mapProg + redProg)/2;
         }
     }
-    public int getTotalHadoopTimeSpent() {
+    public long getTotalHadoopTimeSpent() {
         return totalHadoopTimeSpent;
     }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java Thu Jun 19 12:56:00 2008
@@ -1,9 +1,18 @@
 package org.apache.pig.impl.mapReduceLayer;
 
 import java.io.IOException;
+import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.PlanException;
@@ -11,6 +20,8 @@
 
 
 public class LocalLauncher extends Launcher{
+    private static final Log log = LogFactory.getLog(Launcher.class);
+    
     @Override
     public boolean launchPig(PhysicalPlan<PhysicalOperator> php,
                              String grpName,
@@ -19,6 +30,117 @@
                                                    IOException,
                                                    ExecException,
                                                    JobCreationException {
-        return super.launchPig(php, grpName, pc);
+        long sleepTime = 500;
+        MRCompiler comp = new MRCompiler(php, pc);
+        comp.compile();
+        
+        Configuration conf = new Configuration();
+        conf.set("mapred.job.tracker", "local");
+        JobClient jobClient = new JobClient(new JobConf(conf));
+
+        MROperPlan mrp = comp.getMRPlan();
+        JobControlCompiler jcc = new JobControlCompiler();
+        
+        JobControl jc = jcc.compile(mrp, grpName, conf, pc);
+        
+        
+        int numMRJobs = jc.getWaitingJobs().size();
+        
+        new Thread(jc).start();
+
+        double lastProg = -1;
+        while(!jc.allFinished()){
+            try {
+                Thread.sleep(sleepTime);
+            } catch (InterruptedException e) {}
+            double prog = calculateProgress(jc, jobClient)/numMRJobs;
+            if(prog>lastProg)
+                log.info(prog * 100 + "% complete");
+            lastProg = prog;
+        }
+        lastProg = calculateProgress(jc, jobClient)/numMRJobs;
+        if(isComplete(lastProg))
+            log.info("Completed Successfully");
+        else{
+            log.info("Unsuccessful attempt. Completed " + lastProg * 100 + "% of the job");
+            List<Job> failedJobs = jc.getFailedJobs();
+            if(failedJobs==null)
+                throw new ExecException("Something terribly wrong with Job Control.");
+            for (Job job : failedJobs) {
+                getStats(job,jobClient);
+            }
+        }
+        List<Job> succJobs = jc.getSuccessfulJobs();
+        if(succJobs!=null)
+            for(Job job : succJobs){
+                getStats(job,jobClient);
+            }
+
+        jc.stop(); 
+        
+        return isComplete(lastProg);
+    }
+    
+    //A purely testing method. Not to be used elsewhere
+    public boolean launchPigWithCombinePlan(PhysicalPlan<PhysicalOperator> php,
+            String grpName, PigContext pc, PhysicalPlan combinePlan) throws PlanException,
+            VisitorException, IOException, ExecException, JobCreationException {
+        long sleepTime = 500;
+        MRCompiler comp = new MRCompiler(php, pc);
+        comp.compile();
+
+        Configuration conf = new Configuration();
+        conf.set("mapred.job.tracker", "local");
+        JobClient jobClient = new JobClient(new JobConf(conf));
+
+        MROperPlan mrp = comp.getMRPlan();
+        if(mrp.getLeaves().get(0)!=mrp.getRoots().get(0))
+            throw new PlanException("Unsupported configuration to test combine plan");
+        
+        MapReduceOper mro = mrp.getLeaves().get(0);
+        mro.combinePlan = combinePlan;
+        
+        JobControlCompiler jcc = new JobControlCompiler();
+
+        JobControl jc = jcc.compile(mrp, grpName, conf, pc);
+
+        int numMRJobs = jc.getWaitingJobs().size();
+
+        new Thread(jc).start();
+
+        double lastProg = -1;
+        while (!jc.allFinished()) {
+            try {
+                Thread.sleep(sleepTime);
+            } catch (InterruptedException e) {
+            }
+            double prog = calculateProgress(jc, jobClient) / numMRJobs;
+            if (prog > lastProg)
+                log.info(prog * 100 + "% complete");
+            lastProg = prog;
+        }
+        lastProg = calculateProgress(jc, jobClient) / numMRJobs;
+        if (isComplete(lastProg))
+            log.info("Completed Successfully");
+        else {
+            log.info("Unsuccessful attempt. Completed " + lastProg * 100
+                    + "% of the job");
+            List<Job> failedJobs = jc.getFailedJobs();
+            if (failedJobs == null)
+                throw new ExecException(
+                        "Something terribly wrong with Job Control.");
+            for (Job job : failedJobs) {
+                getStats(job, jobClient);
+            }
+        }
+        List<Job> succJobs = jc.getSuccessfulJobs();
+        if (succJobs != null)
+            for (Job job : succJobs) {
+                getStats(job, jobClient);
+            }
+
+        jc.stop();
+
+        return isComplete(lastProg);
     }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java Thu Jun 19 12:56:00 2008
@@ -398,6 +398,7 @@
     
     
     private MapReduceOper endSingleInputPlanWithStr(FileSpec fSpec) throws PlanException{
+        if(compiledInputs.length>1) throw new PlanException("Received a multi input plan when expecting only a single input one.");
         MapReduceOper mro = compiledInputs[0];
         POStore str = getStore();
         str.setSFile(fSpec);
@@ -779,7 +780,7 @@
         POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
         prj.setColumn(1);
         prj.setOverloaded(false);
-        prj.setResultType(DataType.BYTEARRAY);
+        prj.setResultType(DataType.BAG);
         ep.add(prj);
         List<ExprPlan> eps2 = new ArrayList<ExprPlan>();
         eps2.add(ep);
@@ -798,10 +799,13 @@
         return mro;
     }
 
-    public MapReduceOper getQuantileJob(POSort sort, MapReduceOper prevJob, FileSpec lFile, FileSpec quantFile, int rp, int[] fields) throws PlanException, VisitorException {
+    public MapReduceOper getQuantileJob(POSort inpSort, MapReduceOper prevJob, FileSpec lFile, FileSpec quantFile, int rp, int[] fields) throws PlanException, VisitorException {
         FileSpec quantLdFilName = new FileSpec(lFile.getFileName(),RandomSampleLoader.class.getName());
         MapReduceOper mro = startNew(quantLdFilName, prevJob);
         mro.UDFs.add(FindQuantiles.class.getName());
+        POSort sort = new POSort(inpSort.getOperatorKey(), inpSort
+                .getRequestedParallelism(), null, inpSort.getSortPlans(),
+                inpSort.getMAscCols(), inpSort.getMSortFunc());
         if(sort.isUDFComparatorUsed)
             mro.UDFs.add(sort.getMSortFunc().getFuncSpec());
         
@@ -832,6 +836,7 @@
         ExprPlan ep1 = new ExprPlan();
         ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
         ce.setValue("all");
+        ce.setResultType(DataType.CHARARRAY);
         ep1.add(ce);
         
         List<ExprPlan> eps = new ArrayList<ExprPlan>();
@@ -858,8 +863,8 @@
         
         POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
         topPrj.setColumn(1);
-        topPrj.setOverloaded(true);
         topPrj.setResultType(DataType.TUPLE);
+        topPrj.setOverloaded(true);
         fe2Plan.add(topPrj);
         
         ExprPlan nesSortPlan = new ExprPlan();
@@ -872,19 +877,22 @@
         nesSortPlanLst.add(nesSortPlan);
         
         sort.setSortPlans(nesSortPlanLst);
+        sort.setResultType(DataType.TUPLE);
         fe2Plan.add(sort);
         fe2Plan.connect(topPrj, sort);
         
         ExprPlan ep3 = new ExprPlan();
         POProject prjStar3 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        prjStar3.setResultType(DataType.TUPLE);
-        prjStar3.setStar(true);
+        prjStar3.setResultType(DataType.BAG);
+        prjStar3.setColumn(0);
+        prjStar3.setStar(false);
         ep3.add(prjStar3);
         
         ExprPlan rpep = new ExprPlan();
         ConstantExpression rpce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
         rpce.setRequestedParallelism(rp);
         rpce.setValue(rp<=0?1:rp);
+        rpce.setResultType(DataType.INTEGER);
         rpep.add(rpce);
         
         List<ExprPlan> genEps = new ArrayList<ExprPlan>();

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java Thu Jun 19 12:56:00 2008
@@ -1,9 +1,20 @@
 package org.apache.pig.impl.mapReduceLayer;
 
 import java.io.IOException;
+import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.PlanException;
@@ -14,7 +25,7 @@
  *
  */
 public class MapReduceLauncher extends Launcher{
-
+    private static final Log log = LogFactory.getLog(Launcher.class);
     @Override
     public boolean launchPig(PhysicalPlan<PhysicalOperator> php,
                              String grpName,
@@ -23,6 +34,53 @@
                                                    IOException,
                                                    ExecException,
                                                    JobCreationException {
-        return super.launchPig(php, grpName, pc);
+        long sleepTime = 500;
+        MRCompiler comp = new MRCompiler(php, pc);
+        comp.compile();
+        
+        ExecutionEngine exe = pc.getExecutionEngine();
+        Configuration conf = ((HConfiguration)exe.getConfiguration()).getConfiguration();
+        JobClient jobClient = ((HExecutionEngine)exe).getJobClient();
+
+        MROperPlan mrp = comp.getMRPlan();
+        JobControlCompiler jcc = new JobControlCompiler();
+        
+        JobControl jc = jcc.compile(mrp, grpName, conf, pc);
+        
+        int numMRJobs = jc.getWaitingJobs().size();
+        
+        new Thread(jc).start();
+
+        double lastProg = -1;
+        while(!jc.allFinished()){
+            try {
+                Thread.sleep(sleepTime);
+            } catch (InterruptedException e) {}
+            double prog = calculateProgress(jc, jobClient)/numMRJobs;
+            if(prog>lastProg)
+                log.info(prog * 100 + "% complete");
+            lastProg = prog;
+        }
+        lastProg = calculateProgress(jc, jobClient)/numMRJobs;
+        if(isComplete(lastProg))
+            log.info("Completed Successfully");
+        else{
+            log.info("Unsuccessful attempt. Completed " + lastProg * 100 + "% of the job");
+            List<Job> failedJobs = jc.getFailedJobs();
+            if(failedJobs==null)
+                throw new ExecException("Something terribly wrong with Job Control.");
+            for (Job job : failedJobs) {
+                getStats(job,jobClient);
+            }
+        }
+        List<Job> succJobs = jc.getSuccessfulJobs();
+        if(succJobs!=null)
+            for(Job job : succJobs){
+                getStats(job,jobClient);
+            }
+
+        jc.stop(); 
+        
+        return isComplete(lastProg);
     }
 }

Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigCombiner.java?rev=669666&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigCombiner.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigCombiner.java Thu Jun 19 12:56:00 2008
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.mapReduceLayer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.data.TargetedTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+/**
+ * This class is the static Mapper & Reducer classes that
+ * are used by Pig to execute Pig Map Reduce jobs. Since
+ * there is a reduce phase, the leaf is bound to be a 
+ * POLocalRearrange. So the map phase has to separate the
+ * key and indexed tuple and collect it into the output
+ * collector.
+ * 
+ * The shuffle and sort phase sorts these key & indexed tuples
+ * and creates key, List<IndexedTuple> and passes the key and
+ * iterator to the list. The deserialized POPackage operator
+ * is used to package the key, List<IndexedTuple> into pigKey, 
+ * Bag<Tuple> where pigKey is of the appropriate pig type and
+ * then the result of the package is attached to the reduce
+ * plan which is executed if its not empty. Either the result 
+ * of the reduce plan or the package res is collected into
+ * the output collector. 
+ *
+ */
+public class PigCombiner {
+
+    public static JobConf sJobConf = null;
+    
+    public static class Combine extends MapReduceBase
+            implements
+            Reducer<WritableComparable, IndexedTuple, WritableComparable, Writable> {
+        private final Log log = LogFactory.getLog(getClass());
+        
+        //The reduce plan
+        private PhysicalPlan<PhysicalOperator> cp;
+        
+        //The POPackage operator which is the
+        //root of every Map Reduce plan is
+        //obtained through the job conf. The portion
+        //remaining after its removal is the reduce
+        //plan
+        private POPackage pack;
+        
+        ProgressableReporter pigReporter;
+        
+        /**
+         * Configures the Reduce plan, the POPackage operator
+         * and the reporter thread
+         */
+        @Override
+        public void configure(JobConf jConf) {
+            super.configure(jConf);
+            sJobConf = jConf;
+            try {
+                cp = (PhysicalPlan<PhysicalOperator>) ObjectSerializer.deserialize(jConf
+                        .get("pig.combinePlan"));
+                pack = (POPackage)ObjectSerializer.deserialize(jConf.get("pig.combine.package"));
+                // To be removed
+                if(cp.isEmpty())
+                    log.debug("Combine Plan empty!");
+                else{
+                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                    cp.explain(baos);
+                    log.debug(baos.toString());
+                }
+                // till here
+                
+                long sleepTime = jConf.getLong("pig.reporter.sleep.time", 10000);
+
+                pigReporter = new ProgressableReporter();
+            } catch (IOException e) {
+                log.error(e.getMessage() + "was caused by:");
+                log.error(e.getCause().getMessage());
+            }
+        }
+        
+        /**
+         * The reduce function which packages the key and List<IndexedTuple>
+         * into key, Bag<Tuple> after converting Hadoop type key into Pig type.
+         * The package result is either collected as is, if the reduce plan is
+         * empty or after passing through the reduce plan.
+         */
+        public void reduce(WritableComparable key,
+                Iterator<IndexedTuple> indInp,
+                OutputCollector<WritableComparable, Writable> oc,
+                Reporter reporter) throws IOException {
+            
+            pigReporter.setRep(reporter);
+            
+            Object k = DataType.convertToPigType(key);
+            pack.attachInput(k, indInp);
+            
+            try {
+                Tuple t=null;
+                Result res = pack.getNext(t);
+                if(res.returnStatus==POStatus.STATUS_OK){
+                    Tuple packRes = (Tuple)res.result;
+                    
+                    if(cp.isEmpty()){
+                        oc.collect(null, packRes);
+                        return;
+                    }
+                    
+                    cp.attachInput(packRes);
+
+                    List<PhysicalOperator> leaves = cp.getLeaves();
+                    
+                    PhysicalOperator leaf = leaves.get(0);
+                    while(true){
+                        Result redRes = leaf.getNext(t);
+                        
+                        if(redRes.returnStatus==POStatus.STATUS_OK){
+//                            oc.collect(null, (Tuple)redRes.result);
+                            Tuple tuple = (Tuple)redRes.result;
+                            Object combKey = tuple.get(0);
+                            IndexedTuple it = (IndexedTuple)tuple.get(1);
+                            WritableComparable wcKey = DataType.getWritableComparableTypes(combKey);
+                            oc.collect(wcKey, it);
+                            continue;
+                        }
+                        
+                        if(redRes.returnStatus==POStatus.STATUS_EOP)
+                            return;
+                        
+                        if(redRes.returnStatus==POStatus.STATUS_NULL)
+                            continue;
+                        
+                        if(redRes.returnStatus==POStatus.STATUS_ERR){
+                            IOException ioe = new IOException("Received Error while " +
+                                    "processing the reduce plan.");
+                            throw ioe;
+                        }
+                    }
+                }
+                
+                if(res.returnStatus==POStatus.STATUS_NULL)
+                    return;
+                
+                if(res.returnStatus==POStatus.STATUS_ERR){
+                    IOException ioe = new IOException("Packaging error while processing group");
+                    throw ioe;
+                }
+                    
+                
+            } catch (ExecException e) {
+                IOException ioe = new IOException(e.getMessage());
+                ioe.initCause(e.getCause());
+                throw ioe;
+            }
+        }
+        
+        
+        /**
+         * Will be called once all the intermediate keys and values are
+         * processed. So right place to stop the reporter thread.
+         */
+        @Override
+        public void close() throws IOException {
+            super.close();
+            /*if(runnableReporter!=null)
+                runnableReporter.setDone(true);*/
+            PhysicalOperator.setReporter(null);
+        }
+    }
+    
+    /*interface MapOutputCollector<K extends WritableComparable, V extends Writable>
+    extends OutputCollector<K, V> {
+
+        public void close() throws IOException;
+
+        public void flush() throws IOException;
+
+    }
+
+    static class DirectMapOutputCollector<K extends WritableComparable, V extends Writable>
+            implements MapOutputCollector<K, V> {
+
+        private RecordWriter<K, V> out = null;
+
+        private Reporter reporter = null;
+
+        @SuppressWarnings("unchecked")
+        public DirectMapOutputCollector(JobConf job, Reporter reporter)
+                throws IOException {
+            this.reporter = reporter;
+            String finalName = job.getOutputPath().toString();
+            FileSystem fs = FileSystem.get(job);
+
+            out = job.getOutputFormat().getRecordWriter(fs, job, finalName,
+                    reporter);
+        }
+
+        public void close() throws IOException {
+            if (this.out != null) {
+                out.close(this.reporter);
+            }
+
+        }
+
+        public void flush() throws IOException {
+            // TODO Auto-generated method stub
+
+        }
+
+        public void collect(K key, V value) throws IOException {
+            System.out.println(value.toString());
+        }
+    }
+    
+    public static void main(String[] args) throws IOException {
+        Random r = new Random();
+        PhysicalPlan<PhysicalOperator> rp = new PhysicalPlan<PhysicalOperator>();
+        POForEach fe = GenPhyOp.topForEachOPWithPlan(1);
+        rp.add(fe);
+        PigMapReduce.Reduce red = new PigMapReduce.Reduce();
+        POPackage pk = GenPhyOp.topPackageOp();
+        pk.setKeyType(DataType.INTEGER);
+        pk.setNumInps(1);
+        boolean[] inner = {false}; 
+        pk.setInner(inner);
+        
+        JobConf jConf = new JobConf();
+        jConf.set("pig.reducePlan", ObjectSerializer.serialize(rp));
+        jConf.set("pig.reduce.package",ObjectSerializer.serialize(pk));
+        jConf.setOutputFormat(PigOutputFormat.class);
+        jConf.setOutputPath(new Path("pigmrtst1"));
+        red.configure(jConf);
+        
+        WritableComparable key = new IntWritable(1);
+        List<IndexedTuple> itLst = new ArrayList<IndexedTuple>();
+        for(int i=0;i<2;i++){
+            Tuple t = TupleFactory.getInstance().newTuple();
+            t.append(GenRandomData.genRandString(r));
+            t.append(1);
+            IndexedTuple it = new IndexedTuple(t,0);
+            itLst.add(it);
+        }
+        red.reduce(key,itLst.iterator(),new DirectMapOutputCollector(jConf,reporter), reporter);
+    }*/
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java Thu Jun 19 12:56:00 2008
@@ -39,16 +39,11 @@
     WritableComparator comparator;
     
     public int getPartition(WritableComparable key, Writable value,
-            int numPartitions) {
-        try{
-            Tuple keyTuple = (Tuple)key;
-            int index = Arrays.binarySearch(quantiles, (Tuple)keyTuple.get(0), comparator);
-            if (index < 0)
-                index = -index-1;
-            return Math.min(index, numPartitions - 1);
-        }catch(ExecException e){
-            throw new RuntimeException(e);
-        }
+            int numPartitions){
+        int index = Arrays.binarySearch(quantiles, key, comparator);
+        if (index < 0)
+            index = -index-1;
+        return Math.min(index, numPartitions - 1);
     }
 
     public void configure(JobConf job) {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/Result.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/Result.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/Result.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/Result.java Thu Jun 19 12:56:00 2008
@@ -33,4 +33,11 @@
         returnStatus = POStatus.STATUS_ERR;
         result = null;
     }
+
+    @Override
+    public String toString() {
+        return (result!=null)?result.toString():"NULL";
+    }
+    
+    
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Add.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Add.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Add.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Add.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
 package org.apache.pig.impl.physicalLayer.expressionOperators;
 
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
@@ -46,7 +47,7 @@
 
     @Override
     public String name() {
-        return "Add - " + mKey.toString();
+        return "Add" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/BinaryExpressionOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/BinaryExpressionOperator.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/BinaryExpressionOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/BinaryExpressionOperator.java Thu Jun 19 12:56:00 2008
@@ -60,9 +60,4 @@
     public void setRhs(ExpressionOperator rhs) {
         this.rhs = rhs;
     }
-
-    // TODO Don't we need something here that hooks lhs and rhs to our inputs in
-    // the plan?  Extenders of this class, such as Add, are depending on lhs and
-    // rhs being set.  LogToPhyTranslator is setting inputs.  I don't see
-    // anywhere connecting them together.
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Divide.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Divide.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Divide.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Divide.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
 package org.apache.pig.impl.physicalLayer.expressionOperators;
 
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
@@ -46,7 +47,7 @@
 
     @Override
     public String name() {
-        return "Divide - " + mKey.toString();
+        return "Divide" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/EqualToExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/EqualToExpr.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/EqualToExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/EqualToExpr.java Thu Jun 19 12:56:00 2008
@@ -55,7 +55,7 @@
 
     @Override
     public String name() {
-        return "Equal To - " + mKey.toString();
+        return "Equal To" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GTOrEqualToExpr.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GTOrEqualToExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GTOrEqualToExpr.java Thu Jun 19 12:56:00 2008
@@ -20,6 +20,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
@@ -51,7 +52,7 @@
 
     @Override
     public String name() {
-        return "Greater Than or Equal - " + mKey.toString();
+        return "Greater Than or Equal" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
     
     @Override
@@ -77,7 +78,7 @@
         right = (DataByteArray)res.result;
         
         int ret = left.compareTo(right);
-        if(ret==-1 || ret==0){
+        if(ret==1 || ret==0){
             res.result = new Boolean(true);
             //left = right = null;
             return res;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GreaterThanExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GreaterThanExpr.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GreaterThanExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GreaterThanExpr.java Thu Jun 19 12:56:00 2008
@@ -20,6 +20,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
 import org.apache.pig.impl.physicalLayer.POStatus;
@@ -45,7 +46,7 @@
 
     @Override
     public String name() {
-        return "Greater Than - " + mKey.toString();
+        return "Greater Than" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
     @Override
@@ -75,7 +76,7 @@
         right = (DataByteArray) res.result;
 
         int ret = left.compareTo(right);
-        if (ret == -1) {
+        if (ret == 1) {
             res.result = new Boolean(true);
             // left = right = null;
             return res;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LTOrEqualToExpr.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LTOrEqualToExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LTOrEqualToExpr.java Thu Jun 19 12:56:00 2008
@@ -20,6 +20,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
@@ -49,7 +50,7 @@
 
     @Override
     public String name() {
-        return "Less Than or Equal - " + mKey.toString();
+        return "Less Than or Equal" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
     @Override
@@ -74,7 +75,7 @@
         right = (DataByteArray) res.result;
 
         int ret = left.compareTo(right);
-        if (ret == 1 || ret == 0) {
+        if (ret == -1 || ret == 0) {
             res.result = new Boolean(true);
             //left = right = null;
             return res;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LessThanExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LessThanExpr.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LessThanExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LessThanExpr.java Thu Jun 19 12:56:00 2008
@@ -20,6 +20,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
@@ -50,7 +51,7 @@
 
     @Override
     public String name() {
-        return "Less Than - " + mKey.toString();
+        return "Less Than" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
     @Override
@@ -75,7 +76,7 @@
         right = (DataByteArray) res.result;
 
         int ret = left.compareTo(right);
-        if (ret == 1) {
+        if (ret == -1) {
             res.result = new Boolean(true);
             //left = right = null;
             return res;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Mod.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Mod.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Mod.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Mod.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
 package org.apache.pig.impl.physicalLayer.expressionOperators;
 
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
@@ -46,7 +47,7 @@
 
     @Override
     public String name() {
-        return "Mod - " + mKey.toString();
+        return "Mod" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
     
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Multiply.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Multiply.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Multiply.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Multiply.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
 package org.apache.pig.impl.physicalLayer.expressionOperators;
 
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
@@ -46,7 +47,7 @@
 
     @Override
     public String name() {
-        return "Multiply - " + mKey.toString();
+        return "Multiply" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/NotEqualToExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/NotEqualToExpr.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/NotEqualToExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/NotEqualToExpr.java Thu Jun 19 12:56:00 2008
@@ -55,7 +55,7 @@
 
     @Override
     public String name() {
-        return "Not Equal To - " + mKey.toString();
+        return "Not Equal To" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POAnd.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POAnd.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POAnd.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POAnd.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
 package org.apache.pig.impl.physicalLayer.expressionOperators;
 
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
@@ -50,7 +51,7 @@
 
     @Override
     public String name() {
-        return "And - " + mKey.toString();
+        return "And" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POBinCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POBinCond.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POBinCond.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POBinCond.java Thu Jun 19 12:56:00 2008
@@ -22,6 +22,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.Result;
@@ -124,7 +125,7 @@
 
     @Override
     public String name() {
-        return "POBinCond - " + mKey.toString();
+        return "POBinCond" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
     
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POCast.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POCast.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POCast.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
 package org.apache.pig.impl.physicalLayer.expressionOperators;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -28,6 +29,7 @@
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.POStatus;
@@ -41,8 +43,8 @@
  * Need the full operator implementation.
  */
 public class POCast extends ExpressionOperator {
-
-	LoadFunc load;
+    private String loadFSpec;
+	transient private LoadFunc load;
 	private Log log = LogFactory.getLog(getClass());
 	
     private static final long serialVersionUID = 1L;
@@ -57,8 +59,14 @@
         // TODO Auto-generated constructor stub
     }
     
-    public void setLoad(LoadFunc load) {
-    	this.load = load;
+    private void instantiateFunc() {
+        if(load!=null) return;
+        this.load = (LoadFunc) PigContext.instantiateFuncFromSpec(this.loadFSpec);
+    }
+    
+    public void setLoadFSpec(String fSpec) {
+    	this.loadFSpec = fSpec;
+        instantiateFunc();
     }
 
     @Override
@@ -69,7 +77,7 @@
 
     @Override
     public String name() {
-        return "Cast - " + mKey.toString();
+        return "Cast" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
     @Override
@@ -740,5 +748,10 @@
         return res;
     }
     
+    private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException{
+        is.defaultReadObject();
+        instantiateFunc();
+    }
+    
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POIsNull.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POIsNull.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POIsNull.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POIsNull.java Thu Jun 19 12:56:00 2008
@@ -55,7 +55,7 @@
     @Override
     public String name() {
         // TODO Auto-generated method stub
-        return "POIsNull - " + mKey.toString();
+        return "POIsNull" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
     @Override
@@ -187,11 +187,4 @@
         }
         return res;
     }
-    
-    public void setInput(ExpressionOperator in) {
-        this.expr = in;
-    }
-    
-    
-
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POMapLookUp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POMapLookUp.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POMapLookUp.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POMapLookUp.java Thu Jun 19 12:56:00 2008
@@ -22,6 +22,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
@@ -63,7 +64,7 @@
 	@Override
 	public String name() {
 		// TODO Auto-generated method stub
-		return "POMapLookUp - " + mKey.toString();
+		return "POMapLookUp" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
 	}
 
 	@Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONegative.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONegative.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONegative.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONegative.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
 package org.apache.pig.impl.physicalLayer.expressionOperators;
 
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
@@ -49,7 +50,7 @@
     @Override
     public String name() {
         // TODO Auto-generated method stub
-        return "PONegative - " + mKey.toString();
+        return "PONegative" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
     @Override
@@ -87,11 +88,4 @@
         }
         return res;
     }
-    
-    public void setInput(ExpressionOperator in) {
-        this.expr = in;
-    }
-    
-    
-
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONot.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONot.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONot.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONot.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
 package org.apache.pig.impl.physicalLayer.expressionOperators;
 
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
@@ -50,7 +51,7 @@
 
     @Override
     public String name() {
-        return "Not - " + mKey.toString();
+        return "Not" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POOr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POOr.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POOr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POOr.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
 package org.apache.pig.impl.physicalLayer.expressionOperators;
 
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
@@ -50,7 +51,7 @@
 
     @Override
     public String name() {
-        return "Or - " + mKey.toString();
+        return "Or" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java Thu Jun 19 12:56:00 2008
@@ -23,6 +23,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
@@ -78,7 +79,7 @@
     @Override
     public String name() {
         
-        return "Project(" + ((star) ? "*" : column) + ") - " + mKey.toString();
+        return "Project" + "[" + DataType.findTypeName(resultType) + "]" +"(" + ((star) ? "*" : column) + ") - " + mKey.toString();
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserComparisonFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserComparisonFunc.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserComparisonFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserComparisonFunc.java Thu Jun 19 12:56:00 2008
@@ -1,5 +1,7 @@
 package org.apache.pig.impl.physicalLayer.expressionOperators;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.util.List;
 import java.util.Map;
 
@@ -27,13 +29,13 @@
 	public POUserComparisonFunc(OperatorKey k, int rp, List inp, String funcSpec, ComparisonFunc func) {
 		super(k, rp, inp);
 		this.funcSpec = funcSpec;
-		this.func = func;		
+		this.func = func;
+        if(func==null)
+            instantiateFunc();
 	}
 	
 	public POUserComparisonFunc(OperatorKey k, int rp, List inp, String funcSpec) {
 		this(k, rp, inp, funcSpec, null);
-		
-		instantiateFunc();
 	}
 	
 	private void instantiateFunc() {
@@ -42,8 +44,6 @@
 	}
 	
 	public ComparisonFunc getComparator() {
-		if (func == null)
-			instantiateFunc();
 		return func;
 	}
 	
@@ -51,10 +51,6 @@
 	public Result getNext(Integer i) throws ExecException {
 		Result result = new Result();
 
-		if (func == null)
-			instantiateFunc();
-
-
 		result.result = func.compare(t1, t2);
 		result.returnStatus = (t1 != null && t2 != null) ? POStatus.STATUS_OK
 				: POStatus.STATUS_ERR;
@@ -117,13 +113,15 @@
 	}
 
 	public void attachInput(Tuple t1, Tuple t2) {
-		if (func == null)
-			instantiateFunc();
-
 		this.t1 = t1;
 		this.t2 = t2;
 		inputAttached = true;
 
 	}
+    
+    private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException{
+        is.defaultReadObject();
+        instantiateFunc();
+    }
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java Thu Jun 19 12:56:00 2008
@@ -19,6 +19,7 @@
 package org.apache.pig.impl.physicalLayer.expressionOperators;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.lang.reflect.Type;
 import java.util.List;
 import java.util.Map;
@@ -52,9 +53,10 @@
 	Tuple t1, t2;
 	private final Log log = LogFactory.getLog(getClass());
 	String funcSpec;
-	private final byte INITIAL = 0;
-	private final byte INTERMEDIATE = 1;
-	private final byte FINAL = 2;
+    String origFSpec;
+	public static final byte INITIAL = 0;
+	public static final byte INTERMEDIATE = 1;
+	public static final byte FINAL = 2;
 
 	public POUserFunc(OperatorKey k, int rp, List inp) {
 		super(k, rp);
@@ -64,8 +66,6 @@
 
 	public POUserFunc(OperatorKey k, int rp, List inp, String funcSpec) {
 		this(k, rp, inp, funcSpec, null);
-
-		instantiateFunc();
 	}
 	
 	public POUserFunc(OperatorKey k, int rp, List inp, String funcSpec, EvalFunc func) {
@@ -73,11 +73,13 @@
         super(k, rp);
         super.setInputs(inp);
 		this.funcSpec = funcSpec;
+        this.origFSpec = funcSpec;
 		this.func = func;
+        instantiateFunc(funcSpec);
 	}
 
-	private void instantiateFunc() {
-		this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(this.funcSpec);
+	private void instantiateFunc(String fSpec) {
+		this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
         this.func.setReporter(reporter);
 	}
 	
@@ -140,7 +142,6 @@
                 if(temp.returnStatus!=POStatus.STATUS_OK)
                     return temp;
                 ((Tuple)res.result).append(temp.result);
-                
 			}
 			res.returnStatus = temp.returnStatus;
 			return res;
@@ -148,13 +149,8 @@
 	}
 
 	private Result getNext() throws ExecException {
-		Tuple t = null;
-		Result result;
-		// instantiate the function if its null
-		if (func == null)
-			instantiateFunc();
-
-		result = processInput();
+        Result result = processInput();
+        
 		try {
 			if(result.returnStatus == POStatus.STATUS_OK) {
 				result.result = func.exec((Tuple) result.result);
@@ -239,25 +235,22 @@
 		// func is being changed.
 		switch (Function) {
 		case INITIAL:
-			func = (EvalFunc) PigContext.instantiateFuncFromSpec(getInitial());
-			setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
+            funcSpec = getInitial();
 			break;
 		case INTERMEDIATE:
-			func = (EvalFunc) PigContext.instantiateFuncFromSpec(getIntermed());
-			setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
+            funcSpec = getIntermed();
 			break;
 		case FINAL:
-			func = (EvalFunc) PigContext.instantiateFuncFromSpec(getFinal());
-			setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
+            funcSpec = getFinal();
 			break;
 
 		}
+        instantiateFunc(funcSpec);
+        setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
 	}
 
 	public String getInitial() {
-		if (func == null)
-			instantiateFunc();
-
+	    instantiateFunc(origFSpec);
 		if (func instanceof Algebraic) {
 			return ((Algebraic) func).getInitial();
 		} else {
@@ -268,9 +261,7 @@
 	}
 
 	public String getIntermed() {
-		if (func == null)
-			instantiateFunc();
-
+        instantiateFunc(origFSpec);
 		if (func instanceof Algebraic) {
 			return ((Algebraic) func).getIntermed();
 		} else {
@@ -281,9 +272,7 @@
 	}
 
 	public String getFinal() {
-		if (func == null)
-			instantiateFunc();
-
+        instantiateFunc(origFSpec);
 		if (func instanceof Algebraic) {
 			return ((Algebraic) func).getFinal();
 		} else {
@@ -294,39 +283,24 @@
 	}
 
 	public Type getReturnType() {
-		if (func == null)
-			instantiateFunc();
-
 		return func.getReturnType();
 	}
 
 	public void finish() {
-		if (func == null)
-			instantiateFunc();
-
 		func.finish();
 	}
 
 	public Schema outputSchema(Schema input) {
-		if (func == null)
-			instantiateFunc();
-
 		return func.outputSchema(input);
 	}
 
 	public Boolean isAsynchronous() {
-		if (func == null)
-			instantiateFunc();
-		
 		return func.isAsynchronous();
 	}
 
 	@Override
 	public String name() {
-	    if(funcSpec!=null)
-	        return "POUserFunc" + "(" + funcSpec + ")" + " - " + mKey.toString();
-        else
-            return "POUserFunc" + "(" + "DummySpec" + ")" + " - " + mKey.toString();
+	    return "POUserFunc" + "(" + func.getClass().getName() + ")" + " - " + mKey.toString();
 	}
 
 	@Override
@@ -350,5 +324,9 @@
     public String getFuncSpec() {
         return funcSpec;
     }
-
+    
+    private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException{
+        is.defaultReadObject();
+        instantiateFunc(funcSpec);
+    }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Subtract.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Subtract.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Subtract.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Subtract.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
 package org.apache.pig.impl.physicalLayer.expressionOperators;
 
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
@@ -46,7 +47,7 @@
 
     @Override
     public String name() {
-        return "Subtract - " + mKey.toString();
+        return "Subtract" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/PODistinct.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/PODistinct.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/PODistinct.java Thu Jun 19 12:56:00 2008
@@ -26,6 +26,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
@@ -104,7 +105,7 @@
     @Override
     public String name() {
         // TODO Auto-generated method stub
-        return "PODistinct - " + mKey.toString();
+        return "PODistinct" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POFilter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POFilter.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POFilter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POFilter.java Thu Jun 19 12:56:00 2008
@@ -146,7 +146,7 @@
 
     @Override
     public String name() {
-        return "Filter - " + mKey.toString();
+        return "Filter" + "[" + DataType.findTypeName(resultType) + "]" + " - " + mKey.toString();
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java Thu Jun 19 12:56:00 2008
@@ -23,6 +23,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.DefaultBagFactory;
 import org.apache.pig.data.DefaultTuple;
 import org.apache.pig.data.IndexedTuple;
@@ -81,7 +82,7 @@
 
     @Override
     public String name() {
-        return "For Each - " + mKey.toString();
+        return "For Each" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POGlobalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POGlobalRearrange.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POGlobalRearrange.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POGlobalRearrange.java Thu Jun 19 12:56:00 2008
@@ -20,6 +20,7 @@
 import java.util.List;
 
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
@@ -63,7 +64,7 @@
 
     @Override
     public String name() {
-        return "Global Rearrange - " + mKey.toString();
+        return "Global Rearrange" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POLocalRearrange.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POLocalRearrange.java Thu Jun 19 12:56:00 2008
@@ -87,7 +87,7 @@
 
     @Override
     public String name() {
-        return "Local Rearrange - " + mKey.toString();
+        return "Local Rearrange" + "[" + DataType.findTypeName(resultType) + "]" + "{" + DataType.findTypeName(keyType) + "}" +" - " + mKey.toString();
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POPackage.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POPackage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POPackage.java Thu Jun 19 12:56:00 2008
@@ -25,6 +25,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -93,7 +94,7 @@
 
     @Override
     public String name() {
-        return "Package - " + mKey.toString();
+        return "Package" + "[" + DataType.findTypeName(resultType) + "]" + "{" + DataType.findTypeName(keyType) + "}" +" - " + mKey.toString();
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java Thu Jun 19 12:56:00 2008
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.impl.physicalLayer.relationalOperators;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -29,6 +30,7 @@
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.POStatus;
@@ -54,7 +56,11 @@
  */
 public class POSort extends PhysicalOperator<PhyPlanVisitor> {
 
-	//private List<Integer> mSortCols;
+	/**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    //private List<Integer> mSortCols;
 	private List<ExprPlan> sortPlans;
 	private List<Byte> ExprOutputTypes;
 	private List<Boolean> mAscCols;
@@ -108,8 +114,13 @@
 
 	}
 	
-	public class SortComparator implements Comparator<Tuple> {
-		public int compare(Tuple o1, Tuple o2) {
+	public class SortComparator implements Comparator<Tuple>,Serializable {
+		/**
+         * 
+         */
+        private static final long serialVersionUID = 1L;
+
+        public int compare(Tuple o1, Tuple o2) {
 			int count = 0;
 			int ret = 0;
 			if(sortPlans == null || sortPlans.size() == 0) 
@@ -165,9 +176,14 @@
 		}
 	}
 
-	public class UDFSortComparator implements Comparator<Tuple> {
+	public class UDFSortComparator implements Comparator<Tuple>,Serializable {
+
+		/**
+         * 
+         */
+        private static final long serialVersionUID = 1L;
 
-		public int compare(Tuple t1, Tuple t2) {
+        public int compare(Tuple t1, Tuple t2) {
 
 			mSortFunc.attachInput(t1, t2);
 			Integer i = null;
@@ -190,7 +206,7 @@
 	@Override
 	public String name() {
 
-		return "POSort - " + mKey.toString();
+		return "POSort" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
 	}
 
 	@Override
@@ -220,13 +236,13 @@
 
 		}
 		if (it == null) {
-			it = sortedBag.iterator();
-		}
-		res.result = it.next();
-		if (res.result == null)
-			res.returnStatus = POStatus.STATUS_EOP;
-		else
-			res.returnStatus = POStatus.STATUS_OK;
+            it = sortedBag.iterator();
+        }
+        if (it.hasNext()) {
+            res.result = it.next();
+            res.returnStatus = POStatus.STATUS_OK;
+        } else
+            res.returnStatus = POStatus.STATUS_EOP;
 		return res;
 	}
 
@@ -264,4 +280,8 @@
         mSortFunc = sortFunc;
     }
 
+    public List<Boolean> getMAscCols() {
+        return mAscCols;
+    }
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POUnion.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POUnion.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POUnion.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POUnion.java Thu Jun 19 12:56:00 2008
@@ -21,6 +21,7 @@
 import java.util.List;
 
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
@@ -82,7 +83,7 @@
 
     @Override
     public String name() {
-        return "Union - " + mKey.toString();
+        return "Union" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
     @Override

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Thu Jun 19 12:56:00 2008
@@ -270,6 +270,7 @@
         pigServer.registerQuery("answer = FOREACH cogrouped GENERATE COUNT(queryLog),group;");
         
         Iterator<Tuple> iter = pigServer.openIterator("answer");
+        if(!iter.hasNext()) fail("No Output received");
         while(iter.hasNext()){
             Tuple t = iter.next();
             assertEquals(expectedResults.get(t.get(1).toString()).doubleValue(),(DataType.toDouble(t.get(0))).doubleValue());

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestGTOrEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestGTOrEqual.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestGTOrEqual.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestGTOrEqual.java Thu Jun 19 12:56:00 2008
@@ -84,7 +84,7 @@
             lt.setValue(inpba1);
             rt.setValue(inpba2);
             Result resba = g.getNext(inpba1);
-            boolean retba = (inpba1.compareTo(inpba2) == -1 || inpba1
+            boolean retba = (inpba1.compareTo(inpba2) == 1 || inpba1
                     .compareTo(inpba2) == 0);
             if ((Boolean) resba.result == retba)
                 return true;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java Thu Jun 19 12:56:00 2008
@@ -84,7 +84,7 @@
             lt.setValue(inpba1);
             rt.setValue(inpba2);
             Result resba = g.getNext(inpba1);
-            boolean retba = (inpba1.compareTo(inpba2) == -1);
+            boolean retba = (inpba1.compareTo(inpba2) == 1);
             if ((Boolean) resba.result == retba)
                 return true;
             return false;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLTOrEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLTOrEqual.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLTOrEqual.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLTOrEqual.java Thu Jun 19 12:56:00 2008
@@ -84,7 +84,7 @@
             lt.setValue(inpba1);
             rt.setValue(inpba2);
             Result resba = g.getNext(inpba1);
-            boolean retba = (inpba1.compareTo(inpba2) == 1 || inpba1
+            boolean retba = (inpba1.compareTo(inpba2) == -1 || inpba1
                     .compareTo(inpba2) == 0);
             if ((Boolean) resba.result == retba)
                 return true;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLessThan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLessThan.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLessThan.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLessThan.java Thu Jun 19 12:56:00 2008
@@ -84,7 +84,7 @@
             lt.setValue(inpba1);
             rt.setValue(inpba2);
             Result resba = g.getNext(inpba1);
-            boolean retba = (inpba1.compareTo(inpba2) == 1);
+            boolean retba = (inpba1.compareTo(inpba2) == -1);
             if ((Boolean) resba.result == retba)
                 return true;
             return false;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestNull.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestNull.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestNull.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestNull.java Thu Jun 19 12:56:00 2008
@@ -51,7 +51,7 @@
         ConstantExpression lt = (ConstantExpression) GenPhyOp.exprConst();
         lt.setResultType(type);
         POIsNull isNullExpr = (POIsNull) GenPhyOp.compIsNullExpr();
-        isNullExpr.setInput(lt);
+        isNullExpr.setExpr(lt);
 
         Object inp1;
         Result res;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java Thu Jun 19 12:56:00 2008
@@ -662,7 +662,7 @@
 	public void testByteArrayToOther() throws PlanException, ExecException {
 		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
 		LoadFunc load = new TestLoader();
-		op.setLoad(load);
+		op.setLoadFSpec(load.getClass().getName());
 		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
 		ExprPlan plan = new ExprPlan();
 		plan.add(prj);

Modified: incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Arithmetic.gld
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Arithmetic.gld?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Arithmetic.gld (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Arithmetic.gld Thu Jun 19 12:56:00 2008
@@ -1,26 +1,26 @@
-For Each - Test-Plan-Builder-88
+For Each[tuple] - Test-Plan-Builder-88
 |   |
 |   POGenerate(false,false,false)  - Test-Plan-Builder-87
 |   |   |
-|   |   Add - Test-Plan-Builder-80
+|   |   Add[Unknown] - Test-Plan-Builder-80
 |   |   |
-|   |   |---Add - Test-Plan-Builder-78
+|   |   |---Add[Unknown] - Test-Plan-Builder-78
 |   |   |   |
-|   |   |   |---Project(0) - Test-Plan-Builder-76
+|   |   |   |---Project[bytearray](0) - Test-Plan-Builder-76
 |   |   |   |
-|   |   |   |---Project(1) - Test-Plan-Builder-77
+|   |   |   |---Project[bytearray](1) - Test-Plan-Builder-77
 |   |   |
 |   |   |---Constant(5) - Test-Plan-Builder-79
 |   |   |
-|   |   Subtract - Test-Plan-Builder-85
+|   |   Subtract[Unknown] - Test-Plan-Builder-85
 |   |   |
-|   |   |---Subtract - Test-Plan-Builder-83
+|   |   |---Subtract[Unknown] - Test-Plan-Builder-83
 |   |   |   |
-|   |   |   |---Project(0) - Test-Plan-Builder-81
+|   |   |   |---Project[bytearray](0) - Test-Plan-Builder-81
 |   |   |   |
 |   |   |   |---Constant(5) - Test-Plan-Builder-82
 |   |   |
-|   |   |---Project(1) - Test-Plan-Builder-84
+|   |   |---Project[bytearray](1) - Test-Plan-Builder-84
 |   |   |
 |   |   Constant(hello) - Test-Plan-Builder-86
 |

Modified: incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/BinCond.gld
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/BinCond.gld?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/BinCond.gld (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/BinCond.gld Thu Jun 19 12:56:00 2008
@@ -1,25 +1,25 @@
-For Each - Test-Plan-Builder-140
+For Each[tuple] - Test-Plan-Builder-140
 |   |
 |   POGenerate(false)  - Test-Plan-Builder-139
 |   |   |
-|   |   POBinCond - Test-Plan-Builder-138
+|   |   POBinCond[Unknown] - Test-Plan-Builder-138
 |   |   |
-|   |   |---Equal To - Test-Plan-Builder-131
+|   |   |---Equal To[tuple] - Test-Plan-Builder-131
 |   |   |   |
-|   |   |   |---Project(1) - Test-Plan-Builder-129
+|   |   |   |---Project[bytearray](1) - Test-Plan-Builder-129
 |   |   |   |
 |   |   |   |---Constant(3) - Test-Plan-Builder-130
 |   |   |
-|   |   |---Add - Test-Plan-Builder-134
+|   |   |---Add[Unknown] - Test-Plan-Builder-134
 |   |   |   |
-|   |   |   |---Project(2) - Test-Plan-Builder-132
+|   |   |   |---Project[bytearray](2) - Test-Plan-Builder-132
 |   |   |   |
-|   |   |   |---Project(3) - Test-Plan-Builder-133
+|   |   |   |---Project[bytearray](3) - Test-Plan-Builder-133
 |   |   |
-|   |   |---Subtract - Test-Plan-Builder-137
+|   |   |---Subtract[Unknown] - Test-Plan-Builder-137
 |   |       |
-|   |       |---Project(2) - Test-Plan-Builder-135
+|   |       |---Project[bytearray](2) - Test-Plan-Builder-135
 |   |       |
-|   |       |---Project(3) - Test-Plan-Builder-136
+|   |       |---Project[bytearray](3) - Test-Plan-Builder-136
 |
 |---Load(a:org.apache.pig.builtin.PigStorage()) - Test-Plan-Builder-128
\ No newline at end of file