You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/07/10 00:20:46 UTC

svn commit: r675362 - in /incubator/pig/branches/types: src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/local/executionengine/ src/org/apache/pig/data/ src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/logicalLayer/...

Author: gates
Date: Wed Jul  9 15:20:46 2008
New Revision: 675362

URL: http://svn.apache.org/viewvc?rev=675362&view=rev
Log:
PIG-292 Fixes many sort issues.  Primary among them is that DataByteArray.compareTo now works properly and keyComparatorClass for the haddop job is
now chosen correctly in order by cases.


Modified:
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java
    incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MROpPlanVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Jul  9 15:20:46 2008
@@ -235,6 +235,8 @@
     public ExecJob execute(PhysicalPlan plan,
                            String jobName) throws ExecException {
         try {
+            FileSpec spec = checkLeafIsStore(plan);
+            /*
             PhysicalOperator leaf = (PhysicalOperator)plan.getLeaves().get(0);
             FileSpec spec = null;
             if(!(leaf instanceof POStore)){
@@ -251,6 +253,7 @@
             else{
                 spec = ((POStore)leaf).getSFile();
             }
+            */
 
             MapReduceLauncher launcher = new MapReduceLauncher();
             boolean success = launcher.launchPig(plan, jobName, pigContext);
@@ -277,8 +280,13 @@
         try {
             PlanPrinter printer = new PlanPrinter(plan);
             printer.visit();
-            System.out.println();
-        } catch (VisitorException ve) {
+            stream.println();
+
+            checkLeafIsStore(plan);
+
+            MapReduceLauncher launcher = new MapReduceLauncher();
+            launcher.explain(plan, pigContext, stream);
+        } catch (Exception ve) {
             throw new RuntimeException(ve);
         }
     }
@@ -459,7 +467,31 @@
         InetAddress.getByName(parts[0]);
         return parts[0] + ":" + parts[1];
     }
-    
+
+    private FileSpec checkLeafIsStore(PhysicalPlan plan) throws ExecException {
+        try {
+            PhysicalOperator leaf = (PhysicalOperator)plan.getLeaves().get(0);
+            FileSpec spec = null;
+            if(!(leaf instanceof POStore)){
+                String scope = leaf.getOperatorKey().getScope();
+                POStore str = new POStore(new OperatorKey(scope,
+                    NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+                str.setPc(pigContext);
+                spec = new FileSpec(FileLocalizer.getTemporaryPath(null,
+                    pigContext).toString(),
+                    BinStorage.class.getName());
+                str.setSFile(spec);
+                plan.addAsLeaf(str);
+            } else{
+                spec = ((POStore)leaf).getSFile();
+            }
+            return spec;
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+   
 }
 
 

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Wed Jul  9 15:20:46 2008
@@ -50,6 +50,7 @@
 import org.apache.pig.impl.mapReduceLayer.LocalLauncher;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.plans.PlanPrinter;
 import org.apache.pig.impl.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.impl.plan.VisitorException;
 import java.util.Iterator;
@@ -163,7 +164,16 @@
     }
 
     public void explain(PhysicalPlan plan, PrintStream stream) {
-        // TODO FIX
+        try {
+            PlanPrinter printer = new PlanPrinter(plan);
+            printer.visit();
+            stream.println();
+
+            LocalLauncher launcher = new LocalLauncher();
+            launcher.explain(plan, pigContext, stream);
+        } catch (Exception ve) {
+            throw new RuntimeException(ve);
+        }
     }
 
     public Collection<ExecJob> runningJobs(Properties properties) throws ExecException {

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java Wed Jul  9 15:20:46 2008
@@ -110,22 +110,29 @@
         return new String(mData);
     }
 
+    /**
+     * Compare two byte arrays.  Comparison is done first using byte values
+     * then length.  So "g" will be greater than "abcdefg", but "hello worlds"
+     * is greater than "hello world".  If the other object is not a
+     * DataByteArray, DataType.compare will be called.
+     * @param other Other object to compare to.
+     * @return -1 if less than, 1 if greater than, 0 if equal.
+     */
     public int compareTo(Object other) {
         if (other instanceof DataByteArray) {
             DataByteArray dba = (DataByteArray)other;
             int mySz = mData.length;
             int tSz = dba.mData.length;
-            if (tSz < mySz) {
-                return 1;
-            } else if (tSz > mySz) {
-                return -1;
-            } else {
-                for (int i = 0; i < mySz; i++) {
-                    if (mData[i] < dba.mData[i]) return -1;
-                    else if (mData[i] > dba.mData[i]) return 1;
-                }
-                return 0;
+            int i;
+            for (i = 0; i < mySz; i++) {
+                // If the other has run out of characters, we're bigger.
+                if (i >= tSz) return 1;
+                if (mData[i] < dba.mData[i]) return -1;
+                else if (mData[i] > dba.mData[i]) return 1;
             }
+            // If the other still has characters left, it's greater
+            if (i < tSz) return -1;
+            return 0;
         } else {
             return DataType.compare(this, other);
         }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java Wed Jul  9 15:20:46 2008
@@ -18,6 +18,7 @@
 package org.apache.pig.impl.builtin;
 
 import java.io.IOException;
+import java.util.Comparator;
 import java.util.Iterator;
 
 import org.apache.pig.EvalFunc;
@@ -30,6 +31,14 @@
 public class FindQuantiles extends EvalFunc<DataBag>{
     BagFactory mBagFactory = BagFactory.getInstance();
     
+    private class SortComparator implements Comparator<Tuple> {
+        public int compare(Tuple t1, Tuple t2) {
+            return t1.compareTo(t2);
+        }
+    }
+
+    private Comparator<Tuple> mComparator = new SortComparator();
+
     /**
      * first field in the input tuple is the number of quantiles to generate
      * second field is the *sorted* bag of samples
@@ -47,7 +56,8 @@
             ioe.initCause(e);
             throw ioe;
         }
-        DataBag output = mBagFactory.newDefaultBag();
+        // TODO If user provided a comparator we should be using that.
+        DataBag output = mBagFactory.newSortedBag(mComparator);
         
         long numSamples = samples.size();
         

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java Wed Jul  9 15:20:46 2008
@@ -93,21 +93,21 @@
 
     @Override
     public Schema getSchema() throws FrontendException {
-        log.info("Entering getSchema");
+        log.trace("Entering getSchema");
         if (!mIsSchemaComputed) {
             List<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>(
                     mForEachPlans.size());
 
             for (LogicalPlan plan : mForEachPlans) {
-                log.info("Number of leaves in " + plan + " = " + plan.getLeaves().size());
+                log.debug("Number of leaves in " + plan + " = " + plan.getLeaves().size());
                 for(int i = 0; i < plan.getLeaves().size(); ++i) {
-                    log.info("Leaf" + i + "= " + plan.getLeaves().get(i));
+                    log.debug("Leaf" + i + "= " + plan.getLeaves().get(i));
                 }
                 //LogicalOperator op = plan.getRoots().get(0);
                 LogicalOperator op = plan.getLeaves().get(0);
-                log.info("op: " + op.getClass().getName() + " " + op);
+                log.debug("op: " + op.getClass().getName() + " " + op);
             }
-            log.info("Printed the leaves of the generate plans");
+            log.debug("Printed the leaves of the generate plans");
 
             Map<Schema.FieldSchema, String> flattenAlias = new HashMap<Schema.FieldSchema, String>();
             Map<String, Boolean> inverseFlattenAlias = new HashMap<String, Boolean>();
@@ -116,19 +116,19 @@
             for (int planCtr = 0; planCtr < mForEachPlans.size(); ++planCtr) {
                 LogicalPlan plan = mForEachPlans.get(planCtr);
                 LogicalOperator op = plan.getLeaves().get(0);
-                log.info("op: " + op.getClass().getName() + " " + op);
-                log.info("Flatten: " + mFlatten.get(planCtr));
+                log.debug("op: " + op.getClass().getName() + " " + op);
+                log.debug("Flatten: " + mFlatten.get(planCtr));
                 Schema.FieldSchema planFs;
 
                 try {
 	                planFs = ((ExpressionOperator)op).getFieldSchema();
-                    log.info("planFs: " + planFs);
+                    log.debug("planFs: " + planFs);
 					if(null != planFs) {
 						String outerCanonicalAlias = op.getAlias();
 						if(null == outerCanonicalAlias) {
 							outerCanonicalAlias = planFs.alias;
 						}
-						log.info("Outer canonical alias: " + outerCanonicalAlias);
+						log.debug("Outer canonical alias: " + outerCanonicalAlias);
 						if(mFlatten.get(planCtr)) {
 							//need to extract the children and create the aliases
 							//assumption here is that flatten is only for one column
@@ -137,8 +137,8 @@
 							Schema s = planFs.schema;
 							if(null != s) {
 								for(Schema.FieldSchema fs: s.getFields()) {
-									log.info("fs: " + fs);
-									log.info("fs.alias: " + fs.alias);
+									log.debug("fs: " + fs);
+									log.debug("fs.alias: " + fs.alias);
 									String innerCanonicalAlias = fs.alias;
 									if((null != outerCanonicalAlias) && (null != innerCanonicalAlias)) {
 										String disambiguatorAlias = outerCanonicalAlias + "::" + innerCanonicalAlias;
@@ -196,19 +196,19 @@
 			//check for duplicate column names and throw an error if there are duplicates
 			//ensure that flatten gets rid of duplicate column names when the checks are
 			//being done
-			log.info(" flattenAlias: " + flattenAlias);
-			log.info(" inverseFlattenAlias: " + inverseFlattenAlias);
-			log.info(" aliases: " + aliases);
-			log.info(" fss.size: " + fss.size());
+			log.debug(" flattenAlias: " + flattenAlias);
+			log.debug(" inverseFlattenAlias: " + inverseFlattenAlias);
+			log.debug(" aliases: " + aliases);
+			log.debug(" fss.size: " + fss.size());
 			boolean duplicates = false;
 			Map<String, Integer> duplicateAliases = new HashMap<String, Integer>();
 			for(String alias: aliases.keySet()) {
 				Integer count = aliases.get(alias);
 				if(count > 1) {//not checking for null here as counts are intitalized to 1
 					Boolean inFlatten = false;
-					log.info("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias);
+					log.debug("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias);
 					inFlatten = inverseFlattenAlias.get(alias);
-					log.info("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias);
+					log.debug("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias);
 					if((null == inFlatten) || (!inFlatten)) {
 						duplicates = true;
 						duplicateAliases.put(alias, count);
@@ -237,14 +237,14 @@
 				String alias = flattenAlias.get(fs);
 				Integer count = aliases.get(alias);
 				if (null == count) count = 1;
-				log.info("alias: " + alias);
+				log.debug("alias: " + alias);
 				if((null != alias) && (count == 1)) {
 					mSchema.addAlias(alias, fs);
 				}
 			}
             mIsSchemaComputed = true;
         }
-        log.info("Exiting getSchema");
+        log.trace("Exiting getSchema");
         return mSchema;
     }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java Wed Jul  9 15:20:46 2008
@@ -88,6 +88,10 @@
         mSortFunc = func;
     }
 
+    public boolean isStar() {
+        return mIsStar;
+    }
+
     public void setStar(boolean b) {
         mIsStar = b;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java Wed Jul  9 15:20:46 2008
@@ -108,6 +108,7 @@
             } catch (PlanException e) {
                 log.error("Invalid physical operators in the physical plan"
                         + e.getMessage());
+                throw new VisitorException(e);
             }
         }
     }
@@ -137,6 +138,7 @@
             } catch (PlanException e) {
                 log.error("Invalid physical operators in the physical plan"
                         + e.getMessage());
+                throw new VisitorException(e);
             }
         }
     }
@@ -165,6 +167,7 @@
             } catch (PlanException e) {
                 log.error("Invalid physical operators in the physical plan"
                         + e.getMessage());
+                throw new VisitorException(e);
             }
         }
     }
@@ -193,6 +196,7 @@
             } catch (PlanException e) {
                 log.error("Invalid physical operators in the physical plan"
                         + e.getMessage());
+                throw new VisitorException(e);
             }
         }
     }
@@ -221,6 +225,7 @@
             } catch (PlanException e) {
                 log.error("Invalid physical operators in the physical plan"
                         + e.getMessage());
+                throw new VisitorException(e);
             }
         }
     }
@@ -249,6 +254,7 @@
             } catch (PlanException e) {
                 log.error("Invalid physical operators in the physical plan"
                         + e.getMessage());
+                throw new VisitorException(e);
             }
         }
     }
@@ -276,6 +282,7 @@
             } catch (PlanException e) {
                 log.error("Invalid physical operators in the physical plan"
                         + e.getMessage());
+                throw new VisitorException(e);
             }
         }
     }
@@ -303,6 +310,7 @@
             } catch (PlanException e) {
                 log.error("Invalid physical operators in the physical plan"
                         + e.getMessage());
+                throw new VisitorException(e);
             }
         }
     }
@@ -330,6 +338,7 @@
             } catch (PlanException e) {
                 log.error("Invalid physical operators in the physical plan"
                         + e.getMessage());
+                throw new VisitorException(e);
             }
         }
     }
@@ -357,6 +366,7 @@
             } catch (PlanException e) {
                 log.error("Invalid physical operators in the physical plan"
                         + e.getMessage());
+                throw new VisitorException(e);
             }
         }
     }
@@ -384,6 +394,7 @@
             } catch (PlanException e) {
                 log.error("Invalid physical operators in the physical plan"
                         + e.getMessage());
+                throw new VisitorException(e);
             }
         }
     }
@@ -411,6 +422,7 @@
             } catch (PlanException e) {
                 log.error("Invalid physical operators in the physical plan"
                         + e.getMessage());
+                throw new VisitorException(e);
             }
         }
     }
@@ -434,6 +446,7 @@
                 currentPlan.connect(from, exprOp);
             } catch (PlanException e) {
                 log.error("Invalid physical operators in the physical plan" + e.getMessage());
+                throw new VisitorException(e);
             }
         }
     }
@@ -457,6 +470,7 @@
                 currentPlan.connect(from, exprOp);
             } catch (PlanException e) {
                 log.error("Invalid physical operators in the physical plan" + e.getMessage());
+                throw new VisitorException(e);
             }
         }
     }
@@ -478,6 +492,7 @@
             currentPlan.connect(from, exprOp);
         } catch (PlanException e) {
             log.error("Invalid physical operators in the physical plan" + e.getMessage());
+            throw new VisitorException(e);
         }
     }
 
@@ -501,6 +516,7 @@
         } catch (PlanException e1) {
             log.error("Invalid physical operators in the physical plan"
                     + e1.getMessage());
+            throw new VisitorException(e1);
         }
 
         int count = 0;
@@ -543,6 +559,7 @@
             } catch (PlanException e) {
                 log.error("Invalid physical operators in the physical plan"
                         + e.getMessage());
+                throw new VisitorException(e);
             }
 
         }
@@ -582,6 +599,7 @@
         } catch (PlanException e) {
             log.error("Invalid physical operators in the physical plan"
                     + e.getMessage());
+            throw new VisitorException(e);
         }
     }
 
@@ -612,6 +630,7 @@
 
                 log.error("Invalid physical operators in the physical plan"
                         + e.getMessage());
+                throw new VisitorException(e);
             }
         }
     }
@@ -657,6 +676,7 @@
         } catch (PlanException e) {
             log.error("Invalid physical operators in the physical plan"
                     + e.getMessage());
+            throw new VisitorException(e);
         }
 
     }
@@ -703,6 +723,7 @@
             currentPlan.connect(from, sort);
         } catch (PlanException e) {
             log.error("Invalid physical operator in the plan" + e.getMessage());
+            throw new VisitorException(e);
         }
 
         sort.setResultType(s.getType());
@@ -727,6 +748,7 @@
             currentPlan.connect(from, physOp);
         } catch (PlanException e) {
             log.error("Invalid physical operator in the plan" + e.getMessage());
+            throw new VisitorException(e);
         }
     }
 
@@ -744,6 +766,7 @@
             currentPlan.connect(from, physOp);
         } catch (PlanException e) {
             log.error("Invalid physical operator in the plan" + e.getMessage());
+            throw new VisitorException(e);
         }
     }
 
@@ -772,6 +795,7 @@
             currentPlan.connect(from, physOp);
         } catch (PlanException e) {
             log.error("Invalid physical operator in the plan" + e.getMessage());
+            throw new VisitorException(e);
         }
     }
 
@@ -799,6 +823,7 @@
             } catch (PlanException e) {
                 log.error("Invalid physical operator in the plan"
                         + e.getMessage());
+                throw new VisitorException(e);
             }
         }
         LogToPhyMap.put(func, p);
@@ -834,6 +859,7 @@
             currentPlan.connect(from, store);
         } catch (PlanException e) {
             log.error("Invalid physical operator in the plan" + e.getMessage());
+            throw new VisitorException(e);
         }
         LogToPhyMap.put(loStore, store);
     }
@@ -878,6 +904,7 @@
             } catch (PlanException e) {
                 log.error("Invalid physical operator in the plan"
                         + e.getMessage());
+                throw new VisitorException(e);
             }
         }
 
@@ -899,6 +926,7 @@
             currentPlan.connect(from, physOp);
         } catch (PlanException e) {
             log.error("Invalid physical operator in the plan" + e.getMessage());
+            throw new VisitorException(e);
         }
 
     }
@@ -918,6 +946,7 @@
             currentPlan.connect(from, physOp);
         } catch (PlanException e) {
             log.error("Invalid physical operator in the plan" + e.getMessage());
+            throw new VisitorException(e);
         }
 
     }
@@ -939,6 +968,7 @@
             currentPlan.connect(from, physOp);
         } catch (PlanException e) {
             log.error("Invalid physical operator in the plan" + e.getMessage());
+            throw new VisitorException(e);
         }
 
     }
@@ -959,6 +989,7 @@
             currentPlan.connect(from, physOp);
         } catch (PlanException e) {
             log.error("Invalid physical operator in the plan" + e.getMessage());
+            throw new VisitorException(e);
         }
 
     }
@@ -980,6 +1011,7 @@
             } catch (PlanException e) {
                 log.error("Invalid physical operator in the plan"
                         + e.getMessage());
+                throw new VisitorException(e);
             }
         }
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java Wed Jul  9 15:20:46 2008
@@ -25,13 +25,24 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.IndexedTuple;
@@ -61,6 +72,8 @@
     Configuration conf;
     PigContext pigContext;
     
+    private final Log log = LogFactory.getLog(getClass());
+
     /**
      * The map between MapReduceOpers and their corresponding Jobs
      */
@@ -257,9 +270,7 @@
                 jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
                 Class<? extends WritableComparable> keyClass = DataType.getWritableComparableTypes(pack.getKeyType()).getClass();
                 jobConf.setOutputKeyClass(keyClass);
-                if(keyClass.equals(TupleFactory.getInstance().tupleClass())){
-                    jobConf.setOutputKeyComparatorClass(PigWritableComparator.class);
-                }
+                selectComparator(mro, pack.getKeyType(), jobConf);
                 jobConf.setOutputValueClass(IndexedTuple.class);
             }
             
@@ -290,12 +301,128 @@
     }
     
     public static class PigWritableComparator extends WritableComparator {
-        public PigWritableComparator() {
-            super(TupleFactory.getInstance().tupleClass());
+        protected PigWritableComparator(Class c) {
+            super(c);
         }
 
         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
             return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
         }
     }
+
+    public static class PigIntWritableComparator extends PigWritableComparator {
+        public PigIntWritableComparator() {
+            super(IntWritable.class);
+        }
+    }
+
+    public static class PigLongWritableComparator extends PigWritableComparator {
+        public PigLongWritableComparator() {
+            super(LongWritable.class);
+        }
+    }
+
+    public static class PigFloatWritableComparator extends PigWritableComparator {
+        public PigFloatWritableComparator() {
+            super(FloatWritable.class);
+        }
+    }
+
+    /*
+    public static class PigDoubleWritableComparator extends PigWritableComparator {
+        public PigDoubleWritableComparator() {
+            super(Double.class);
+        }
+    }
+    */
+
+    public static class PigCharArrayWritableComparator extends PigWritableComparator {
+        public PigCharArrayWritableComparator() {
+            super(Text.class);
+        }
+    }
+
+    public static class PigDBAWritableComparator extends PigWritableComparator {
+        public PigDBAWritableComparator() {
+            super(BytesWritable.class);
+        }
+    }
+
+    public static class PigTupleWritableComparator extends PigWritableComparator {
+        public PigTupleWritableComparator() {
+            super(TupleFactory.getInstance().tupleClass());
+        }
+    }
+
+    public static class PigBagWritableComparator extends PigWritableComparator {
+        public PigBagWritableComparator() {
+            super(BagFactory.getInstance().newDefaultBag().getClass());
+        }
+    }
+
+    private void selectComparator(
+            MapReduceOper mro,
+            byte keyType,
+            JobConf jobConf) throws JobCreationException {
+        // If this operator is involved in an order by, use the native
+        // comparators.  Otherwise use bytewise comparison.  Have to
+        // look at the next operator too because if we're the quantile
+        // operation we need to use the native comparators.
+        boolean involved = false;
+        if (mro.isGlobalSort()) {
+            involved = true;
+        } else {
+            List<MapReduceOper> succs = plan.getSuccessors(mro);
+            if (succs != null) {
+                MapReduceOper succ = succs.get(0);
+                if (succ.isGlobalSort()) involved = true;
+            }
+        }
+        if (!involved) {
+            switch (keyType) {
+            case DataType.INTEGER:
+                jobConf.setOutputKeyComparatorClass(PigIntWritableComparator.class);
+                break;
+
+            case DataType.LONG:
+                jobConf.setOutputKeyComparatorClass(PigLongWritableComparator.class);
+                break;
+
+            case DataType.FLOAT:
+                jobConf.setOutputKeyComparatorClass(PigFloatWritableComparator.class);
+                break;
+
+            case DataType.DOUBLE:
+                //jobConf.setOutputKeyComparatorClass(PigDoubleWritableComparator.class);
+                log.error("Waiting for Hadoop to support DoubleWritable");
+                throw new JobCreationException("Waiting for Hadoop to support DoubleWritable");
+
+            case DataType.CHARARRAY:
+                jobConf.setOutputKeyComparatorClass(PigCharArrayWritableComparator.class);
+                break;
+
+            case DataType.BYTEARRAY:
+                jobConf.setOutputKeyComparatorClass(PigDBAWritableComparator.class);
+                break;
+
+            case DataType.MAP:
+                log.error("Using Map as key not supported.");
+                throw new JobCreationException("Using Map as key not supported");
+
+            case DataType.TUPLE:
+                jobConf.setOutputKeyComparatorClass(PigTupleWritableComparator.class);
+                break;
+
+            case DataType.BAG:
+                jobConf.setOutputKeyComparatorClass(PigBagWritableComparator.class);
+                break;
+
+            default:
+                throw new RuntimeException("Forgot case for type " +
+                    DataType.findTypeName(keyType));
+            }
+
+        }
+    }
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java Wed Jul  9 15:20:46 2008
@@ -1,6 +1,7 @@
 package org.apache.pig.impl.mapReduceLayer;
 
 import java.io.IOException;
+import java.io.PrintStream;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -62,6 +63,22 @@
     public abstract boolean launchPig(PhysicalPlan php, String grpName, PigContext pc)
             throws PlanException, VisitorException, IOException, ExecException,
             JobCreationException;
+
+    /**
+     * Explain how a pig job will be executed on the underlying
+     * infrastructure.
+     * @param pp PhysicalPlan to explain
+     * @param pc PigContext to use for configuration
+     * @param ps PrintStream to write output on.
+     * @throws VisitorException
+     * @throws IOException
+     */
+    public abstract void explain(
+            PhysicalPlan pp,
+            PigContext pc,
+            PrintStream ps) throws PlanException,
+                                   VisitorException,
+                                   IOException;
     
     protected boolean isComplete(double prog){
         return (int)(Math.ceil(prog)) == (int)1;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java Wed Jul  9 15:20:46 2008
@@ -1,6 +1,7 @@
 package org.apache.pig.impl.mapReduceLayer;
 
 import java.io.IOException;
+import java.io.PrintStream;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -13,6 +14,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.mapReduceLayer.plans.MRPrinter;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
@@ -79,7 +81,21 @@
         
         return isComplete(lastProg);
     }
-    
+
+    @Override
+    public void explain(PhysicalPlan php,
+                        PigContext pc,
+                        PrintStream ps) throws PlanException,
+                                               VisitorException,
+                                               IOException {
+        MRCompiler comp = new MRCompiler(php, pc);
+        comp.compile();
+        MROperPlan mrp = comp.getMRPlan();
+
+        MRPrinter printer = new MRPrinter(ps, mrp);
+        printer.visit();
+    }
+ 
     //A purely testing method. Not to be used elsewhere
     public boolean launchPigWithCombinePlan(PhysicalPlan php,
             String grpName, PigContext pc, PhysicalPlan combinePlan) throws PlanException,

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java Wed Jul  9 15:20:46 2008
@@ -179,14 +179,6 @@
      */
     public MROperPlan compile() throws IOException, PlanException, VisitorException {
         List<PhysicalOperator> leaves = plan.getLeaves();
-        /*for (PhysicalOperator operator : leaves) {
-            compile(operator);
-            if (!curMROp.isMapDone()) {
-                curMROp.setMapDone(true);
-            } else if (!curMROp.isReduceDone()) {
-                curMROp.setReduceDone(true);
-            }
-        }*/
         POStore store = (POStore)leaves.get(0);
         compile(store);
         

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java Wed Jul  9 15:20:46 2008
@@ -1,6 +1,7 @@
 package org.apache.pig.impl.mapReduceLayer;
 
 import java.io.IOException;
+import java.io.PrintStream;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -15,6 +16,7 @@
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.mapReduceLayer.plans.MRPrinter;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.PlanException;
@@ -83,4 +85,19 @@
         
         return isComplete(lastProg);
     }
+
+    @Override
+    public void explain(PhysicalPlan php,
+                        PigContext pc,
+                        PrintStream ps) throws PlanException,
+                                               VisitorException,
+                                               IOException {
+        MRCompiler comp = new MRCompiler(php, pc);
+        comp.compile();
+        MROperPlan mrp = comp.getMRPlan();
+
+        MRPrinter printer = new MRPrinter(ps, mrp);
+        printer.visit();
+    }
+ 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java Wed Jul  9 15:20:46 2008
@@ -53,7 +53,7 @@
  * key and indexed tuple and collect it into the output
  * collector.
  * 
- * The shuffle and sort phase sorts these key & indexed tuples
+ * The shuffle and sort phase sorts these key &amp; indexed tuples
  * and creates key, List<IndexedTuple> and passes the key and
  * iterator to the list. The deserialized POPackage operator
  * is used to package the key, List<IndexedTuple> into pigKey, 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java Wed Jul  9 15:20:46 2008
@@ -29,6 +29,7 @@
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -63,10 +64,18 @@
                 t = loader.getNext();
                 if (t==null)
                     break;
-                quantiles.add(t);
+                // Need to strip the outer tuple and bag.
+                Object o = t.get(0);
+                if (o instanceof DataBag) {
+                    for (Tuple it : (DataBag)o) {
+                        quantiles.add(it);
+                    }
+                } else {
+                    quantiles.add(t);
+                }
             }
             this.quantiles = quantiles.toArray(new Tuple[0]);
-        }catch (IOException e){
+        }catch (Exception e){
             throw new RuntimeException(e);
         }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MROpPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MROpPlanVisitor.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MROpPlanVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MROpPlanVisitor.java Wed Jul  9 15:20:46 2008
@@ -32,13 +32,7 @@
         super(plan, walker);
     }
 
-    @Override
-    public void visit() throws VisitorException {
-        // TODO Auto-generated method stub
-
-    }
-
-    public void visitMROp(MapReduceOper mr) {
+    public void visitMROp(MapReduceOper mr) throws VisitorException {
         // do nothing
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java Wed Jul  9 15:20:46 2008
@@ -22,6 +22,8 @@
 import java.util.List;
 import java.io.PrintStream;
 
+import org.apache.pig.impl.mapReduceLayer.MapReduceOper;
+import org.apache.pig.impl.physicalLayer.plans.PlanPrinter;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -38,252 +40,39 @@
      * @param plan MR plan to print
      */
     public MRPrinter(PrintStream ps, MROperPlan plan) {
-        super(plan, new DepthFirstWalker(plan));
-    }
-
-    /* TODO FIX
-    public void visit(LOAdd a) throws VisitorException {
-        visitBinary(a, "+");
-    }
-
-    public void visit(LOAnd a) throws VisitorException {
-        visitBinary(a, "AND");
-    }
-    
-    public void visit(LOBinCond bc) throws VisitorException {
-        print(bc);
-        mStream.print(" COND: (");
-        bc.getCond().visit(this);
-        mStream.print(") TRUE: (");
-        bc.getLhsOp().visit(this);
-        mStream.print(") FALSE (");
-        bc.getRhsOp().visit(this);
-        mStream.print(")");
-    }
-
-    public void visit(LOCogroup g) throws VisitorException {
-        print(g);
-        mStream.print("GROUP BY PLANS:");
-        MultiMap<LogicalOperator, LogicalPlan> plans = g.getGroupByPlans();
-        for (LogicalOperator lo : plans.keySet()) {
-            // Visit the associated plans
-            for (LogicalPlan plan : plans.get(lo)) {
-                mIndent++;
-                pushWalker(new DepthFirstWalker(plan));
-                visit();
-                popWalker();
-                mIndent--;
-            }
-            mStream.println();
-        }
-        // Visit input operators
-        for (LogicalOperator lo : plans.keySet()) {
-            // Visit the operator
-            lo.visit(this);
-        }
-    }
-        
-    public void visit(LOConst c) throws VisitorException {
-        print(c);
-        mStream.print(" VALUE (" + c.getValue() + ")");
-    }
-
-    public void visit(LOCross c) throws VisitorException {
-        print(c);
-        mStream.println();
-        super.visit(c);
-    }
-
-    public void visit(LODistinct d) throws VisitorException {
-        print(d);
-        mStream.println();
-        super.visit(d);
-    }
-
-    public void visit(LODivide d) throws VisitorException {
-        visitBinary(d, "/");
-    }
-
-    public void visit(LOEqual e) throws VisitorException {
-        visitBinary(e, "==");
-    }
-
-    public void visit(LOFilter f) throws VisitorException {
-        print(f);
-        mStream.print(" COMP: ");
-        mIndent++;
-        pushWalker(new DepthFirstWalker(f.getComparisonPlan()));
-        visit();
-        mIndent--;
-        mStream.println();
-        f.getInput().visit(this);
-    }
-
-     public void visit(LOForEach f) throws VisitorException {
-        print(f);
-        mStream.print(" PLAN: ");
-        mIndent++;
-        pushWalker(new DepthFirstWalker(f.getForEachPlan()));
-        visit();
-        mIndent--;
-        mStream.println();
-        // Visit our input
-        mPlan.getPredecessors((LogicalOperator)f).get(0).visit(this);
-    }
- 
-    public void visit(LOGreaterThan gt) throws VisitorException {
-        visitBinary(gt, ">");
-    }
-
-    public void visit(LOGreaterThanEqual gte) throws VisitorException {
-        visitBinary(gte, ">=");
-    }
-
-    public void visit(LOLesserThan lt) throws VisitorException {
-        visitBinary(lt, "<");
-    }
-
-    public void visit(LOLesserThanEqual lte) throws VisitorException {
-        visitBinary(lte, "<=");
-    }
-
-    public void visit(LOLoad load) throws VisitorException {
-        print(load);
-        mStream.print(" FILE: " + load.getInputFile().getFileName());
-        mStream.print(" FUNC: " + load.getLoadFunc().getClass().getName());
-        mStream.println();
-    }
-
-    public void visit(LOMapLookup mlu) throws VisitorException {
-        print(mlu);
-        mStream.print("(");
-        mlu.getMap().visit(this);
-        mStream.print(")# " + mlu.getKey());
-    }
-
-    public void visit(LOMod m) throws VisitorException {
-        visitBinary(m, "MOD");
-    }
-
-    public void visit(LOMultiply m) throws VisitorException {
-        visitBinary(m, "*");
-    }
-
-    public void visit(LONegative n) throws VisitorException {
-        visitUnary(n, "-");
-    }
-
-    public void visit(LONot n) throws VisitorException {
-        visitUnary(n, "NOT");
-    }
-
-    public void visit(LONotEqual ne) throws VisitorException {
-        visitBinary(ne, "!=");
-    }
-
-    public void visit(LOOr or) throws VisitorException {
-        visitBinary(or, "OR");
-    }
-
-    public void visit(LOProject p) throws VisitorException {
-        print(p);
-        if (p.isStar()) {
-            mStream.print(" ALL ");
-        } else {
-            List<Integer> cols = p.getProjection();
-            mStream.print(" COL");
-            if (cols.size() > 1) mStream.print("S");
-            mStream.print(" (");
-            for (int i = 0; i < cols.size(); i++) {
-                if (i > 0) mStream.print(", ");
-                mStream.print(cols.get(i));
-            }
-            mStream.print(")");
-        }
-        mStream.print(" FROM ");
-        if (p.getSentinel()) {
-            // This project is connected to some other relation, don't follow
-            // that path or we'll cycle in the graph.
-            p.getExpression().name();
-        } else {
-            mIndent++;
-            p.getExpression().visit(this);
-            mIndent--;
-        }
-    }
-
-    public void visit(LORegexp r) throws VisitorException {
-        print(r);
-        mStream.print(" REGEX (" + r.getRegexp() + ") LOOKING IN (");
-        r.getOperand().visit(this);
-        mStream.print(")");
-    }
-
-    private void print(LogicalOperator lo, String name) {
-        List<EvalSpec> empty = new ArrayList<EvalSpec>();
-        print(lo, name, empty);
-    }
-
-    private void visitBinary(
-            BinaryExpressionOperator b,
-            String op) throws VisitorException {
-        print(b);
-        mStream.print(" (");
-        b.getLhsOperand().visit(this);
-        mStream.print(") " + op + " (");
-        b.getRhsOperand().visit(this);
-        mStream.print(") ");
-    }
-
-    private void visitUnary(
-            UnaryExpressionOperator e,
-            String op) throws VisitorException {
-        print(e);
-        mStream.print(op + " (");
-        e.getOperand().visit(this);
-        mStream.print(") ");
-    }
-
-    private void print(LogicalOperator lo) {
-        for (int i = 0; i < mIndent; i++) mStream.print("    ");
-
-        printName(lo);
-
-        if (!(lo instanceof ExpressionOperator)) {
-            mStream.print("Inputs: ");
-            for (LogicalOperator predecessor : mPlan.getPredecessors(lo)) {
-                printName(predecessor);
-            }
-            mStream.print("Schema: ");
-            try {
-                printSchema(lo.getSchema());
-            } catch (FrontendException fe) {
-                // ignore it, nothing we can do
-                mStream.print("()");
-            }
-        }
-        mStream.print(" : ");
-    }
-
-    private void printName(LogicalOperator lo) {
-        mStream.println(lo.name() + " key(" + lo.getOperatorKey().scope + 
-            ", " + lo.getOperatorKey().id + ") ");
-    }
-
-    private void printSchema(Schema schema) {
-        mStream.print("(");
-        for (Schema.FieldSchema fs : schema.getFields()) {
-            if (fs.alias != null) mStream.print(fs.alias + ": ");
-            mStream.print(DataType.findTypeName(fs.type));
-            if (fs.schema != null) {
-                if (fs.type == DataType.BAG) mStream.print("{");
-                printSchema(fs.schema);
-                if (fs.type == DataType.BAG) mStream.print("}");
-            }
+        super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+        mStream = ps;
+        mStream.println("--------------------------------------------------");
+        mStream.println("| Map Reduce Plan                                |");
+        mStream.println("--------------------------------------------------");
+    }
+
+    @Override
+    public void visitMROp(MapReduceOper mr) throws VisitorException {
+        mStream.println("MapReduce node " + mr.getOperatorKey().toString());
+        if (mr.mapPlan != null && mr.mapPlan.size() > 0) {
+            mStream.println("Map Plan");
+            PlanPrinter printer = new PlanPrinter(mr.mapPlan);
+            printer.visit();
+            mStream.println("--------");
+        }
+        if (mr.combinePlan != null && mr.combinePlan.size() > 0) {
+            mStream.println("Combine Plan");
+            PlanPrinter printer = new PlanPrinter(mr.combinePlan);
+            printer.visit();
+            mStream.println("--------");
+        }
+        if (mr.reducePlan != null && mr.reducePlan.size() > 0) {
+            mStream.println("Reduce Plan");
+            PlanPrinter printer = new PlanPrinter(mr.reducePlan);
+            printer.visit();
+            mStream.println("--------");
+        }
+        mStream.println("Global sort: " + mr.isGlobalSort());
+        if (mr.getQuantFile() != null) {
+            mStream.println("Quantile file: " + mr.getQuantFile());
         }
-        mStream.print(")");
+        mStream.println("----------------");
     }
-    */
 }
 
-        

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java Wed Jul  9 15:20:46 2008
@@ -66,6 +66,7 @@
 	private List<Boolean> mAscCols;
 	private POUserComparisonFunc mSortFunc;
 	private final Log log = LogFactory.getLog(getClass());
+	private Comparator<Tuple> mComparator;
 
 	private boolean inputsAccumulated = false;
 	public boolean isUDFComparatorUsed = false;
@@ -80,16 +81,18 @@
 		this.mAscCols = mAscCols;
 		this.mSortFunc = mSortFunc;
 		if (mSortFunc == null) {
-			sortedBag = BagFactory.getInstance().newSortedBag(
-					new SortComparator());
+            mComparator = new SortComparator();
+			/*sortedBag = BagFactory.getInstance().newSortedBag(
+					new SortComparator());*/
 			ExprOutputTypes = new ArrayList<Byte>(sortPlans.size());
 
 			for(PhysicalPlan plan : sortPlans) {
 				ExprOutputTypes.add(plan.getLeaves().get(0).getResultType());
 			}
 		} else {
-			sortedBag = BagFactory.getInstance().newSortedBag(
-					new UDFSortComparator());
+			/*sortedBag = BagFactory.getInstance().newSortedBag(
+					new UDFSortComparator());*/
+            mComparator = new UDFSortComparator();
 			isUDFComparatorUsed = true;
 		}
 	}
@@ -171,6 +174,15 @@
             case DataType.LONG:
                 res = Op.getNext(dummyLong);
                 break;
+            case DataType.TUPLE:
+                res = Op.getNext(dummyTuple);
+                break;
+
+            default:
+                String msg = new String("Did not expect result of type " +
+                    DataType.findTypeName(resultType));
+                log.error(msg);
+                throw new RuntimeException(msg);
             }
 			return res;
 		}
@@ -220,6 +232,7 @@
 		Result res = new Result();
 		if (!inputsAccumulated) {
 			res = processInput();
+            sortedBag = BagFactory.getInstance().newSortedBag(mComparator);
 			while (res.returnStatus != POStatus.STATUS_EOP) {
 				if (res.returnStatus == POStatus.STATUS_ERR) {
 					log.error("Error in reading from the inputs");

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java Wed Jul  9 15:20:46 2008
@@ -314,6 +314,67 @@
     }
 
     @Test
+    public void testMultiFieldTupleCompareTo() throws Exception {
+        TupleFactory tf = TupleFactory.getInstance();
+
+        Tuple t1 = tf.newTuple();
+        Tuple t2 = tf.newTuple();
+
+        t1.append(new DataByteArray("bbb"));
+        t1.append(new DataByteArray("bbb"));
+        t2.append(new DataByteArray("bbb"));
+        t2.append(new DataByteArray("bbb"));
+
+        assertEquals("same data equal", 0,  t1.compareTo(t2));
+
+        t2 = tf.newTuple();
+        t2.append(new DataByteArray("aaa"));
+        t2.append(new DataByteArray("aaa"));
+        assertEquals("greater than tuple with lesser value", 1, t1.compareTo(t2));
+
+        t2 = tf.newTuple();
+        t2.append(new DataByteArray("ddd"));
+        t2.append(new DataByteArray("ddd"));
+        assertEquals("less than tuple with greater value", -1, t1.compareTo(t2));
+
+        // First column same, second lesser
+        t2 = tf.newTuple();
+        t2.append(new DataByteArray("bbb"));
+        t2.append(new DataByteArray("aaa"));
+        assertEquals("greater than tuple with lesser value", 1, t1.compareTo(t2));
+
+        // First column same, second greater
+        t2 = tf.newTuple();
+        t2.append(new DataByteArray("bbb"));
+        t2.append(new DataByteArray("ccc"));
+        assertEquals("greater than tuple with lesser value", -1, t1.compareTo(t2));
+
+        // First column less, second same
+        t2 = tf.newTuple();
+        t2.append(new DataByteArray("aaa"));
+        t2.append(new DataByteArray("bbb"));
+        assertEquals("greater than tuple with lesser value", 1, t1.compareTo(t2));
+
+        // First column greater, second same
+        t2 = tf.newTuple();
+        t2.append(new DataByteArray("ccc"));
+        t2.append(new DataByteArray("bbb"));
+        assertEquals("greater than tuple with lesser value", -1, t1.compareTo(t2));
+
+        // First column less, second greater
+        t2 = tf.newTuple();
+        t2.append(new DataByteArray("aaa"));
+        t2.append(new DataByteArray("ccc"));
+        assertEquals("greater than tuple with lesser value", 1, t1.compareTo(t2));
+
+        // First column greater, second same
+        t2 = tf.newTuple();
+        t2.append(new DataByteArray("ccc"));
+        t2.append(new DataByteArray("aaa"));
+        assertEquals("greater than tuple with lesser value", -1, t1.compareTo(t2));
+    }
+
+    @Test
     public void testByteArrayToString() throws Exception {
         DataByteArray ba = new DataByteArray("hello world");
 
@@ -350,10 +411,23 @@
 
         assertTrue("same data", ba1.compareTo(ba2) == 0);
 
-        assertFalse("lexically lower value less than",
+        assertTrue("different length lexically lower value less than",
             ba3.compareTo(ba1) < 0);
-        assertFalse("lexically higher value greater than",
+        assertTrue("different length lexically higher value greater than",
             ba1.compareTo(ba3) > 0);
+
+        ba2 = new DataByteArray("hello worlc");
+        assertTrue("same length lexically lower value less than",
+            ba2.compareTo(ba1) < 0);
+        assertTrue("same length lexically higher value greater than",
+            ba1.compareTo(ba2) > 0);
+
+        ba2 = new DataByteArray("hello worlds");
+        assertTrue("shorter lexically same value less than",
+            ba1.compareTo(ba2) < 0);
+        assertTrue("longer lexically same value greater than",
+            ba2.compareTo(ba1) > 0);
+
     }
 
     private Tuple giveMeOneOfEach() throws Exception {