You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/12/24 01:44:51 UTC

svn commit: r729182 [1/2] - in /hadoop/pig/branches/types: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ src/org/apache/pig/backend/hadoop/executionengine/ph...

Author: olga
Date: Tue Dec 23 16:44:50 2008
New Revision: 729182

URL: http://svn.apache.org/viewvc?rev=729182&view=rev
Log:
PIG-563: support for multiple combiner invocations

Modified:
    hadoop/pig/branches/types/CHANGES.txt
    hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
    hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
    hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/AVG.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/COUNT.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleAvg.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMax.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMin.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleSum.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatAvg.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMax.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMin.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatSum.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/IntAvg.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMax.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMin.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/IntSum.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/LongAvg.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMax.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMin.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/LongSum.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/MAX.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/MIN.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/SUM.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/StringMax.java
    hadoop/pig/branches/types/src/org/apache/pig/builtin/StringMin.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestCombiner.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
    hadoop/pig/branches/types/test/org/apache/pig/test/Util.java

Modified: hadoop/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Tue Dec 23 16:44:50 2008
@@ -343,3 +343,5 @@
 	PIG-6: Add load support from hbase (hustlmsp via gates).
 
     PIG-522: make negation work (pradeepk via olgan)
+
+    PIG-563: support for multiple combiner invocations

Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Tue Dec 23 16:44:50 2008
@@ -28,17 +28,20 @@
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POPrinter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -167,12 +170,29 @@
             if (ap != null) {
                 log.info("Choosing to move algebraic foreach to combiner");
 
-                // Need to insert a foreach in the combine plan.  It will
-                // have one inner plan for each inner plan in the foreach
-                // we're duplicating.  For projections, the plan will be
-                // the same.  For algebraic udfs, the plan will have the
-                // initial version of the function.  The reduce plan will
-                // be changed to have the final version.
+                // Need to insert two new foreachs - one  in the combine
+				// and one in the map plan which will be based on the reduce foreach.
+				// The map foreach will have one inner plan for each 
+				// inner plan in the foreach we're duplicating.  For 
+				// projections, the plan will be the same.  For algebraic 
+				// udfs, the plan will have the initial version of the function.
+				
+				// The combine foreach will have one inner plan for each 
+				// inner plan in the foreach we're duplicating.  For 
+				// projections, the project operators will be changed to
+				// project the same column as its position in the
+				// foreach. For algebraic udfs, the plan will have the 
+				// intermediate version of the function. The input to the
+				// udf will be a POProject which will project the column
+				// corresponding to the position of the udf in the foreach
+				
+			    // In the inner plans of the reduce foreach for 	
+				// projections, the project operators will be changed to
+				// project the same column as its position in the
+				// foreach. For algebraic udfs, the plan will have the 
+				// final version of the function. The input to the
+				// udf will be a POProject which will project the column
+				// corresponding to the position of the udf in the foreach
                 if (mr.combinePlan.getRoots().size() != 0) {
                     log.warn("Wasn't expecting to find anything already "
                         + "in the combiner!");
@@ -185,27 +205,14 @@
                     if (mKeyType == 0) {
                         mKeyType = rearrange.getKeyType();
                     }
-                    POPackage cp = pack.clone();
-                    mr.combinePlan.add(cp);
+
+                    POForEach mfe = foreach.clone();
                     POForEach cfe = foreach.clone();
-                    fixUpForeachs(cfe, foreach, ap);
-                    mr.combinePlan.add(cfe);
-                    mr.combinePlan.connect(cp, cfe);
-                    // No need to connect projections in cfe to cp, because
-                    // PigCombiner directly attaches output from package to
-                    // root of remaining plan.
-                    POLocalRearrange clr = rearrange.clone();
-                    fixUpRearrange(clr);
-                    mr.combinePlan.add(clr);
-                    mr.combinePlan.connect(cfe, clr);
+                    fixUpForeachs(mfe, cfe, foreach, ap);
                     
-                    // stream input to the algebraics in the 
-                    // combine plan
-                    LastInputStreamingOptimizer.replaceWithPOJoinPackage(
-                            mr.combinePlan, cp, cfe, chunkSize);
                     
                     // Use the ExprType list returned from algebraic to tell
-                    // POPostCombinerPackage which fields need projected and
+                    // POCombinerPackage which fields need projected and
                     // which placed in bags.
                     int numFields = (mKeyField >= ap.size()) ? mKeyField + 1 :
                         ap.size();
@@ -215,12 +222,47 @@
                         else bags[i] = true;
                     }
                     bags[mKeyField] = false;
+					// Use the POCombiner package in the combine plan
+					// as it needs to act differently than the regular
+					// package operator.
+                    POCombinerPackage combinePack =
+                        new POCombinerPackage(pack, bags);
+                    mr.combinePlan.add(combinePack);
+                    mr.combinePlan.add(cfe);
+                    mr.combinePlan.connect(combinePack, cfe);
+                    // No need to connect projections in cfe to cp, because
+                    // PigCombiner directly attaches output from package to
+                    // root of remaining plan.
+                    
+                    POLocalRearrange mlr = rearrange.clone();
+                    fixUpRearrange(mlr);
+
+                    // A specialized local rearrange operator will replace
+                    // the normal local rearrange in the map plan. This behaves
+                    // like the regular local rearrange in the getNext() 
+                    // as far as getting its input and constructing the 
+                    // "key" out of the input. It then returns a tuple with
+                    // two fields - the key in the first position and the
+                    // "value" inside a bag in the second position. This output
+                    // format resembles the format out of a Package. This output
+                    // will feed to the map foreach which expects this format.
+                    // If the key field isn't in the project of the combiner or map foreach,
+                    // it is added to the end (This is required so that we can 
+                    // set up the inner plan of the new Local Rearrange leaf in the map
+                    // and combine plan to contain just the project of the key).
+                    patchUpMap(mr.mapPlan, getPreCombinerLR(rearrange), mfe, mlr);
+                    POLocalRearrange clr = rearrange.clone();
+                    fixUpRearrange(clr);
+
+                    mr.combinePlan.add(clr);
+                    mr.combinePlan.connect(cfe, clr);
+                    
                     // Change the package operator in the reduce plan to
-                    // be the post combiner package, as it needs to act
+                    // be the POCombiner package, as it needs to act
                     // differently than the regular package operator.
-                    POPostCombinerPackage newPack =
-                        new POPostCombinerPackage(pack, bags);
-                    mr.reducePlan.replace(pack, newPack);
+                    POCombinerPackage newReducePack =
+                        new POCombinerPackage(pack, bags);
+                    mr.reducePlan.replace(pack, newReducePack);
                     
                     // the replace() above only changes
                     // the plan and does not change "inputs" to 
@@ -228,8 +270,8 @@
                     // set up "inputs" for the operator after
                     // package correctly
                     List<PhysicalOperator> packList = new ArrayList<PhysicalOperator>();
-                    packList.add(newPack);
-                    List<PhysicalOperator> sucs = mr.reducePlan.getSuccessors(newPack);
+                    packList.add(newReducePack);
+                    List<PhysicalOperator> sucs = mr.reducePlan.getSuccessors(newReducePack);
                     // there should be only one successor to package
                     sucs.get(0).setInputs(packList);
                 } catch (Exception e) {
@@ -239,6 +281,40 @@
         }
     }
 
+    /**
+     * @param mapPlan
+     * @param preCombinerLR
+     * @param mfe
+     * @param mlr
+     * @throws PlanException 
+     */
+    private void patchUpMap(PhysicalPlan mapPlan, POPreCombinerLocalRearrange preCombinerLR,
+            POForEach mfe, POLocalRearrange mlr) throws PlanException {
+        
+        POLocalRearrange oldLR = (POLocalRearrange)mapPlan.getLeaves().get(0);
+        mapPlan.replace(oldLR, preCombinerLR);
+        
+        mapPlan.add(mfe);
+        mapPlan.connect(preCombinerLR, mfe);
+        
+        mapPlan.add(mlr);
+        mapPlan.connect(mfe, mlr);
+    }
+
+    /**
+     * @param rearrange
+     * @return
+     */
+    private POPreCombinerLocalRearrange getPreCombinerLR(POLocalRearrange rearrange) {
+        
+        String scope = rearrange.getOperatorKey().scope;
+        POPreCombinerLocalRearrange pclr = new POPreCombinerLocalRearrange(
+                new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)),
+                rearrange.getRequestedParallelism(), rearrange.getInputs());
+        pclr.setPlans(rearrange.getPlans());
+        return pclr;
+    }
+
     /*
     private boolean onKeysOnly(PhysicalPlan pp) {
         // TODO
@@ -282,7 +358,7 @@
             // Don't know what this is, but it isn't algebraic
             return ExprType.NOT_ALGEBRAIC;
         }
-
+                
         // Check that it doesn't have anything in the nested plan that I
         // can't make algebraic.  At this point this is just filters and
         // foreach.  Filters are left out because they are not necessarily
@@ -326,43 +402,54 @@
     // Returns number of fields that this will project, including the added
     // key field if that is necessary
     private void fixUpForeachs(
+            POForEach mfe, // map foreach
             POForEach cfe, // combiner foreach
             POForEach rfe, // reducer foreach
             List<ExprType> exprs) throws PlanException {
+        List<PhysicalPlan> mPlans = mfe.getInputPlans();
         List<PhysicalPlan> cPlans = cfe.getInputPlans();
         List<PhysicalPlan> rPlans = rfe.getInputPlans();
         for (int i = 0; i < exprs.size(); i++) {
             if (exprs.get(i) == ExprType.ALGEBRAIC) {
-                changeFunc(cfe, cPlans.get(i), POUserFunc.INITIAL);
+                changeFunc(mfe, mPlans.get(i), POUserFunc.INITIAL);
+                changeFunc(cfe, cPlans.get(i), POUserFunc.INTERMEDIATE);
                 changeFunc(rfe, rPlans.get(i), POUserFunc.FINAL);
             }
         }
 
-        // Set flattens for combiner ForEach to false
-        List<Boolean> cfeFlattens = new ArrayList<Boolean>(cPlans.size());
+        // Set flattens for map and combiner ForEach to false
+        List<Boolean> feFlattens = new ArrayList<Boolean>(cPlans.size());
         for (int i = 0; i < cPlans.size(); i++) {
-            cfeFlattens.add(false);
+            feFlattens.add(false);
         }
-        cfe.setToBeFlattened(cfeFlattens);
+        mfe.setToBeFlattened(feFlattens);
+        cfe.setToBeFlattened(feFlattens);
 
-        // If the key field isn't in the project of the combiner foreach, add
-        // it to the end.
+        // If the key field isn't in the project of the combiner or map foreach, add
+        // it to the end (This is required so that we can set up the inner plan
+        // of the new Local Rearrange in the map and combine plan to contain just the
+        // project of the key).
         if (mKeyField == -1) {
-            PhysicalPlan newPlan = new PhysicalPlan();
-            String scope = cfe.getOperatorKey().scope;
-            POProject proj = new POProject(new OperatorKey(scope, 
-                NodeIdGenerator.getGenerator().getNextNodeId(scope)), -1, 0);
-            proj.setResultType(mKeyType);
-            newPlan.add(proj);
-            cfe.addInputPlan(newPlan, false);
+            addKeyProject(mfe);
+            addKeyProject(cfe);
             mKeyField = cPlans.size() - 1;
         }
 
-        // Change the plans on the reduce foreach to project from the column
-        // they are in.  UDFs will be left the same but their
+        // Change the plans on the reduce/combine foreach to project from the column
+        // they are in ( we just want to take output from the combine and
+        // use that as input in the reduce/combine plan).  UDFs will be left the same but their
         // inputs altered.  Any straight projections will also be altered.
-        for (int i = 0; i < rPlans.size(); i++) {
-            List<PhysicalOperator> leaves = rPlans.get(i).getLeaves();
+        fixProjectAndInputs(cPlans);
+        fixProjectAndInputs(rPlans);
+    }
+
+    /**
+     * @param plans
+     * @throws PlanException 
+     */
+    private void fixProjectAndInputs(List<PhysicalPlan> plans) throws PlanException {
+        for (int i = 0; i < plans.size(); i++) {
+            List<PhysicalOperator> leaves = plans.get(i).getLeaves();
             if (leaves == null || leaves.size() != 1) {
                 throw new RuntimeException("Expected to find plan with single leaf!");
             }
@@ -378,15 +465,29 @@
                     leaf.getRequestedParallelism(), i);
                 proj.setResultType(DataType.BAG);
                 // Remove old connections and elements from the plan
-                rPlans.get(i).trimAbove(leaf);
-                rPlans.get(i).add(proj);
-                rPlans.get(i).connect(proj, leaf);
+                plans.get(i).trimAbove(leaf);
+                plans.get(i).add(proj);
+                plans.get(i).connect(proj, leaf);
                 List<PhysicalOperator> inputs =
                     new ArrayList<PhysicalOperator>(1);
                 inputs.add(proj);
                 leaf.setInputs(inputs);
             }
         }
+
+    }
+
+    /**
+     * @param fe
+     */
+    private void addKeyProject(POForEach fe) {
+        PhysicalPlan newForEachInnerPlan = new PhysicalPlan();
+        String scope = fe.getOperatorKey().scope;
+        POProject proj = new POProject(new OperatorKey(scope, 
+            NodeIdGenerator.getGenerator().getNextNodeId(scope)), -1, 0);
+        proj.setResultType(mKeyType);
+        newForEachInnerPlan.add(proj);
+        fe.addInputPlan(newForEachInnerPlan, false);
     }
 
     private void changeFunc(POForEach fe, PhysicalPlan plan, byte type) {

Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Dec 23 16:44:50 2008
@@ -335,7 +335,6 @@
                     jobConf.setCombinerClass(PigCombiner.Combine.class);
                     jobConf.set("pig.combinePlan", ObjectSerializer.serialize(mro.combinePlan));
                     jobConf.set("pig.combine.package", ObjectSerializer.serialize(combPack));
-                    jobConf.setCombineOnceOnly(true);
                 } else if (mro.needsDistinctCombiner()) {
                     jobConf.setCombinerClass(DistinctCombiner.Combine.class);
                     log.info("Setting identity combiner class.");

Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Tue Dec 23 16:44:50 2008
@@ -189,8 +189,12 @@
                             continue;
                         
                         if(redRes.returnStatus==POStatus.STATUS_ERR){
-                            IOException ioe = new IOException("Received Error while " +
-                                    "processing the reduce plan.");
+                            String msg = "Received Error while " +
+                            "processing the combine plan.";
+                            if(redRes.result != null) {
+                                msg += redRes.result;
+                            }
+                            IOException ioe = new IOException(msg);
                             throw ioe;
                         }
                     }

Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java Tue Dec 23 16:44:50 2008
@@ -29,7 +29,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.Pair;
@@ -75,7 +75,7 @@
             if(pkg != null) {
                 // if the POPackage is actually a POPostCombinerPackage, then we should
                 // just look for the corresponding LocalRearrange(s) in the combine plan
-                if(pkg instanceof POPostCombinerPackage) {
+                if(pkg instanceof POCombinerPackage) {
                     if(patchPackage(mr.combinePlan, pkg) != pkg.getNumInps()) {
                         throw new VisitorException("Unexpected problem while trying " +
                         		"to optimize (could not find LORearrange in combine plan)");
@@ -152,7 +152,7 @@
          * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitCombinerPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage)
          */
         @Override
-        public void visitCombinerPackage(POPostCombinerPackage pkg)
+        public void visitCombinerPackage(POCombinerPackage pkg)
                 throws VisitorException {
             this.pkg = pkg;
         }

Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Tue Dec 23 16:44:50 2008
@@ -76,7 +76,7 @@
         //do nothing
     }
     
-    public void visitCombinerPackage(POPostCombinerPackage pkg) throws VisitorException{
+    public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException{
         //do nothing
     }
     
@@ -261,6 +261,15 @@
         
     }
 
+    /**
+     * @param preCombinerLocalRearrange
+     */
+    public void visitPreCombinerLocalRearrange(
+            POPreCombinerLocalRearrange preCombinerLocalRearrange) {
+        // TODO Auto-generated method stub
+        
+    }
+
     public void visitCross(POCross cross) {
         // TODO Auto-generated method stub
         

Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java Tue Dec 23 16:44:50 2008
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.io.NullableTuple;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.Pair;
-/**
- * The package operator that packages the globally rearranged tuples into
- * output format after the combiner stage.  It differs from POPackage in that
- * instead it does not use the index in the NullableTuple to find the bag to put a
- * tuple in.  Intead, the inputs are
- * put in a bag corresponding to their offset in the tuple.
- */
-public class POPostCombinerPackage extends POPackage {
-    /**
-     * 
-     */
-    private static final long serialVersionUID = 1L;
-
-    private final Log log = LogFactory.getLog(getClass());
-
-    private static BagFactory mBagFactory = BagFactory.getInstance();
-    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
-
-    private boolean[] mBags; // For each field, indicates whether or not it
-                             // needs to be put in a bag.
-
-    /**
-     * A new POPostCombinePackage will be constructed as a near clone of the
-     * provided POPackage.
-     * @param pkg POPackage to clone.
-     * @param bags for each field, indicates whether it should be a bag (true)
-     * or a simple field (false).
-     */
-    public POPostCombinerPackage(POPackage pkg, boolean[] bags) {
-        super(new OperatorKey(pkg.getOperatorKey().scope,
-            NodeIdGenerator.getGenerator().getNextNodeId(pkg.getOperatorKey().scope)),
-            pkg.getRequestedParallelism(), pkg.getInputs());
-        resultType = pkg.getResultType();
-        keyType = pkg.keyType;
-        numInputs = 1;
-        inner = new boolean[1];
-        for (int i = 0; i < pkg.inner.length; i++) {
-            inner[i] = true;
-        }
-        mBags = bags;
-    }
-
-    @Override
-    public String name() {
-        return "PostCombinerPackage" + "[" + DataType.findTypeName(resultType) + "]" + "{" + DataType.findTypeName(keyType) + "}" +" - " + mKey.toString();
-    }
-
-    @Override
-    public Result getNext(Tuple t) throws ExecException {
-        int keyField = -1;
-        //Create numInputs bags
-        Object[] fields = new Object[mBags.length];
-        for (int i = 0; i < mBags.length; i++) {
-            if (mBags[i]) fields[i] = mBagFactory.newDefaultBag();
-        }
-        
-        // For each indexed tup in the inp, split them up and place their
-        // fields into the proper bags.  If the given field isn't a bag, just
-        // set the value as is.
-        while (tupIter.hasNext()) {
-            NullableTuple ntup = tupIter.next();
-            Tuple tup = (Tuple)ntup.getValueAsPigType();
-            // TODO: IMPORTANT ASSUMPTION: Currently we only combine in the
-            // group case and not in cogroups. So there should only
-            // be one LocalRearrange from which we get the keyInfo for
-            // which field in the value is in the key. This LocalRearrange
-            // has an index of -1. When we do support combiner in Cogroups
-            // THIS WILL NEED TO BE REVISITED.
-            Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
-                keyInfo.get(0); // assumption: only group are "combinable", hence index 0
-            Map<Integer, Integer> keyLookup = lrKeyInfo.second;
-            int tupIndex = 0; // an index for accessing elements from 
-                              // the value (tup) that we have currently
-            for(int i = 0; i < mBags.length; i++) {
-                Integer keyIndex = keyLookup.get(i);
-                if(keyIndex == null) {
-                    // the field for this index is not the
-                    // key - so just take it from the "value"
-                    // we were handed - Currently THIS HAS TO BE A BAG
-                    // In future if this changes, THIS WILL NEED TO BE
-                    // REVISITED.
-                    ((DataBag)fields[i]).add((Tuple)tup.get(tupIndex));
-                    tupIndex++;
-                } else {
-                    // the field for this index is in the key
-                    fields[i] = key;
-                }
-            }
-        }
-        
-        // The successor of the POPostCombinerPackage as of 
-        // now SHOULD be a POForeach which has been adjusted
-        // to look for the key in the right place - so we will
-        // NOT be adding the key in the result here but mere 
-        // putting all bags into a result tuple and returning it. 
-        Tuple res;
-        res = mTupleFactory.newTuple(mBags.length);
-        for (int i = 0; i < mBags.length; i++) res.set(i, fields[i]);
-        Result r = new Result();
-        r.result = res;
-        r.returnStatus = POStatus.STATUS_OK;
-        return r;
-
-    }
-
-}

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/AVG.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/AVG.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/AVG.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/AVG.java Tue Dec 23 16:44:50 2008
@@ -26,11 +26,13 @@
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.WrappedIOException;
 import org.apache.pig.backend.executionengine.ExecException;
 
 
@@ -70,7 +72,7 @@
     }
 
     public String getIntermed() {
-        return Intermed.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -80,13 +82,26 @@
     static public class Initial extends EvalFunc<Tuple> {
         @Override
         public Tuple exec(Tuple input) throws IOException {
+            Tuple t = mTupleFactory.newTuple(2);
             try {
-                Tuple t = mTupleFactory.newTuple(2);
-                t.set(0, sum(input));
-                t.set(1, new Long(count(input)));
+                // input is a bag with one tuple containing
+                // the column we are trying to avg
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+                DataByteArray dba = (DataByteArray)tp.get(0); 
+                t.set(0, dba != null ? Double.valueOf(dba.toString()) : null);
+                t.set(1, 1L);
+                return t;
+            } catch(NumberFormatException nfe) {
+                // invalid input,
+                // treat this input as null
+                try {
+                    t.set(0, null);
+                    t.set(1, 1L);
+                } catch (ExecException e) {
+                    throw WrappedIOException.wrap(e);
+                }
                 return t;
-            } catch(RuntimeException t) {
-                throw new RuntimeException(t.getMessage() + ": " + input);
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();
                 oughtToBeEE.initCause(ee);
@@ -96,7 +111,7 @@
         }
     }
 
-    static public class Intermed extends EvalFunc<Tuple> {
+    static public class Intermediate extends EvalFunc<Tuple> {
         @Override
         public Tuple exec(Tuple input) throws IOException {
             try {
@@ -151,8 +166,15 @@
         for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
             Tuple t = it.next();
             Double d = (Double)t.get(0);
-            if(d == null) continue;
-            sawNonNull = true;
+            // we count nulls in avg as contributing 0
+            // a departure from SQL for performance of 
+            // COUNT() which implemented by just inspecting
+            // size of the bag
+            if(d == null) {
+                d = 0.0;
+            } else {
+                sawNonNull = true;
+            }
             sum += d;
             count += (Long)t.get(1);
         }
@@ -183,13 +205,11 @@
         for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
             Tuple t = it.next();
             try{
-                Double d = DataType.toDouble(t.get(0));
+                DataByteArray dba = (DataByteArray)t.get(0);
+                Double d = dba != null ? Double.valueOf(dba.toString()) : null;
                 if (d == null) continue;
                 sawNonNull = true;
                 sum += d;
-            }catch(NumberFormatException nfe){
-                // do nothing - essentially treat this
-                // particular input as null
             }catch(RuntimeException exp) {
                 ExecException newE =  new ExecException("Error processing: " +
                     t.toString() + exp.getMessage(), exp);
@@ -203,7 +223,7 @@
             return null;
         }
     }
-    
+
     @Override
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/COUNT.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/COUNT.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/COUNT.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/COUNT.java Tue Dec 23 16:44:50 2008
@@ -41,7 +41,8 @@
     @Override
     public Long exec(Tuple input) throws IOException {
         try {
-            return count(input);
+            DataBag bag = (DataBag)input.get(0);
+            return bag.size();
         } catch (ExecException ee) {
             throw WrappedIOException.wrap("Caught exception in COUNT", ee);
         }
@@ -52,7 +53,7 @@
     }
 
     public String getIntermed() {
-        return Intermed.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -63,21 +64,20 @@
 
         @Override
         public Tuple exec(Tuple input) throws IOException {
-            try {
-                return mTupleFactory.newTuple(count(input));
-            } catch (ExecException ee) {
-                throw WrappedIOException.wrap(
-                    "Caught exception in COUNT.Initial", ee);
-            }
+            // Since Initial is guaranteed to be called
+            // only in the map, it will be called with an
+            // input of a bag with a single tuple - the 
+            // count should always be 1.
+            return mTupleFactory.newTuple(new Long(1));
         }
     }
 
-    static public class Intermed extends EvalFunc<Tuple> {
+    static public class Intermediate extends EvalFunc<Tuple> {
 
         @Override
         public Tuple exec(Tuple input) throws IOException {
             try {
-                return mTupleFactory.newTuple(count(input));
+                return mTupleFactory.newTuple(sum(input));
             } catch (ExecException ee) {
                 throw WrappedIOException.wrap(
                     "Caught exception in COUNT.Intermed", ee);
@@ -97,17 +97,6 @@
         }
     }
 
-    static protected Long count(Tuple input) throws ExecException {
-        Object values = input.get(0);        
-        if (values instanceof DataBag)
-            return ((DataBag)values).size();
-        else if (values instanceof Map)
-            return new Long(((Map)values).size());
-        else
-            throw new ExecException("Cannot count a " +
-                DataType.findTypeName(values));
-    }
-
     static protected Long sum(Tuple input) throws ExecException, NumberFormatException {
         DataBag values = (DataBag)input.get(0);
         long sum = 0;

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleAvg.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleAvg.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleAvg.java Tue Dec 23 16:44:50 2008
@@ -69,7 +69,7 @@
     }
 
     public String getIntermed() {
-        return Intermed.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -81,8 +81,12 @@
         public Tuple exec(Tuple input) throws IOException {
             try {
                 Tuple t = mTupleFactory.newTuple(2);
-                t.set(0, sum(input));
-                t.set(1, new Long(count(input)));
+                // input is a bag with one tuple containing
+                // the column we are trying to avg on
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+                t.set(0, (Double)(tp.get(0)));
+                t.set(1, 1L);
                 return t;
             } catch(RuntimeException t) {
                 throw new RuntimeException(t.getMessage() + ": " + input);
@@ -95,7 +99,7 @@
         }
     }
 
-    static public class Intermed extends EvalFunc<Tuple> {
+    static public class Intermediate extends EvalFunc<Tuple> {
         @Override
         public Tuple exec(Tuple input) throws IOException {
             try {
@@ -150,8 +154,15 @@
         for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
             Tuple t = it.next();
             Double d = (Double)t.get(0);
-            if(d == null) continue;
-            sawNonNull = true;
+            // we count nulls in avg as contributing 0
+            // a departure from SQL for performance of 
+            // COUNT() which implemented by just inspecting
+            // size of the bag
+            if(d == null) {
+                d = 0.0;
+            } else {
+                sawNonNull = true;
+            }
             sum += d;
             count += (Long)t.get(1);
         }

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMax.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMax.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMax.java Tue Dec 23 16:44:50 2008
@@ -51,7 +51,7 @@
     }
 
     public String getIntermed() {
-        return Initial.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -64,6 +64,25 @@
         @Override
         public Tuple exec(Tuple input) throws IOException {
             try {
+                // input is a bag with one tuple containing
+                // the column we are trying to max on
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+                return tfact.newTuple((Double)(tp.get(0)));
+            } catch (ExecException ee) {
+                IOException oughtToBeEE = new IOException();
+                oughtToBeEE.initCause(ee);
+                throw oughtToBeEE;
+            }
+        }
+    }
+
+    static public class Intermediate extends EvalFunc<Tuple> {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
                 return tfact.newTuple(max(input));
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMin.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMin.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMin.java Tue Dec 23 16:44:50 2008
@@ -50,7 +50,7 @@
     }
 
     public String getIntermed() {
-        return Initial.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -63,6 +63,25 @@
         @Override
         public Tuple exec(Tuple input) throws IOException {
             try {
+                // input is a bag with one tuple containing
+                // the column we are trying to min on
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+                return tfact.newTuple((Double)(tp.get(0)));
+            } catch (ExecException ee) {
+                IOException oughtToBeEE = new IOException();
+                oughtToBeEE.initCause(ee);
+                throw oughtToBeEE;
+            }
+        }
+    }
+
+    static public class Intermediate extends EvalFunc<Tuple> {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
                 return tfact.newTuple(min(input));
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleSum.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleSum.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleSum.java Tue Dec 23 16:44:50 2008
@@ -31,6 +31,7 @@
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.WrappedIOException;
 
 
 /**
@@ -54,7 +55,7 @@
     }
 
     public String getIntermed() {
-        return Initial.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -66,10 +67,28 @@
 
         @Override
         public Tuple exec(Tuple input) throws IOException {
+            // Initial is called in the map - for SUM
+            // we just send the tuple down
+            try {
+                // input is a bag with one tuple containing
+                // the column we are trying to sum
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+                return tfact.newTuple((Double)( tp.get(0)));
+            } catch (ExecException e) {
+                throw WrappedIOException.wrap("Caught exception in DoubleSum.Initial", e);
+            }
+        }
+    }
+    static public class Intermediate extends EvalFunc<Tuple> {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
             try {
                 return tfact.newTuple(sum(input));
             } catch (ExecException ee) {
-                IOException oughtToBeEE = new IOException();
+                IOException oughtToBeEE = new IOException("Caught exception in DoubleSum.Intermediate");
                 oughtToBeEE.initCause(ee);
                 throw oughtToBeEE;
             }
@@ -81,7 +100,7 @@
             try {
                 return sum(input);
             } catch (ExecException ee) {
-                IOException oughtToBeEE = new IOException();
+                IOException oughtToBeEE = new IOException("Caught exception in DoubleSum.Final");
                 oughtToBeEE.initCause(ee);
                 throw oughtToBeEE;
             }

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatAvg.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatAvg.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatAvg.java Tue Dec 23 16:44:50 2008
@@ -66,7 +66,7 @@
     }
 
     public String getIntermed() {
-        return Intermed.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -78,8 +78,13 @@
         public Tuple exec(Tuple input) throws IOException {
             try {
                 Tuple t = mTupleFactory.newTuple(2);
-                t.set(0, sum(input));
-                t.set(1, new Long(count(input)));
+                // input is a bag with one tuple containing
+                // the column we are trying to avg on
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+                Float f = (Float)(tp.get(0));
+                t.set(0, f != null ? new Double(f) : null);
+                t.set(1, 1L);
                 return t;
             } catch(RuntimeException t) {
                 throw new RuntimeException(t.getMessage() + ": " + input);
@@ -92,7 +97,7 @@
         }
     }
 
-    static public class Intermed extends EvalFunc<Tuple> {
+    static public class Intermediate extends EvalFunc<Tuple> {
         @Override
         public Tuple exec(Tuple input) throws IOException {
             try {
@@ -147,8 +152,15 @@
         for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
             Tuple t = it.next();
             Double d = (Double)t.get(0);
-            if(d == null) continue;
-            sawNonNull = true;
+            // we count nulls in avg as contributing 0
+            // a departure from SQL for performance of 
+            // COUNT() which implemented by just inspecting
+            // size of the bag
+            if(d == null) {
+                d = 0.0;
+            } else {
+                sawNonNull = true;
+            }
             sum += d;
             count += (Long)t.get(1);
         }

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMax.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMax.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMax.java Tue Dec 23 16:44:50 2008
@@ -50,7 +50,7 @@
     }
 
     public String getIntermed() {
-        return Initial.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -63,6 +63,25 @@
         @Override
         public Tuple exec(Tuple input) throws IOException {
             try {
+                // input is a bag with one tuple containing
+                // the column we are trying to max on
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+                return tfact.newTuple((Float)(tp.get(0)));
+            } catch (ExecException ee) {
+                IOException oughtToBeEE = new IOException();
+                oughtToBeEE.initCause(ee);
+                throw oughtToBeEE;
+            }
+        }
+    }
+
+    static public class Intermediate extends EvalFunc<Tuple> {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
                 return tfact.newTuple(max(input));
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMin.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMin.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMin.java Tue Dec 23 16:44:50 2008
@@ -50,7 +50,7 @@
     }
 
     public String getIntermed() {
-        return Initial.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -63,6 +63,25 @@
         @Override
         public Tuple exec(Tuple input) throws IOException {
             try {
+                // input is a bag with one tuple containing
+                // the column we are trying to min on
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+                return tfact.newTuple((Float)(tp.get(0)));
+            } catch (ExecException ee) {
+                IOException oughtToBeEE = new IOException();
+                oughtToBeEE.initCause(ee);
+                throw oughtToBeEE;
+            }
+        }
+    }
+
+    static public class Intermediate extends EvalFunc<Tuple> {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
                 return tfact.newTuple(min(input));
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatSum.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatSum.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatSum.java Tue Dec 23 16:44:50 2008
@@ -50,7 +50,7 @@
     }
 
     public String getIntermed() {
-        return Initial.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -62,10 +62,32 @@
 
         @Override
         public Tuple exec(Tuple input) throws IOException {
+            // Initial is called in the map - for SUM
+            // we just send the tuple down
             try {
-                return tfact.newTuple(sum(input));
+                // input is a bag with one tuple containing
+                // the column we are trying to sum
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+				// send down a double since intermediate
+				// would  be sending a double
+                Float f = (Float)tp.get(0);
+                return tfact.newTuple(f != null ? 
+                        new Double(f) : null);
+            } catch (ExecException e) {
+                throw WrappedIOException.wrap("Caught exception in FloatSum.Initial", e);
+            }
+        }
+    }
+    static public class Intermediate extends EvalFunc<Tuple> {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
+                return tfact.newTuple(sumDoubles(input));
             } catch (ExecException ee) {
-                throw WrappedIOException.wrap("Caught exception in FloatSum.Initial", ee);
+                throw WrappedIOException.wrap("Caught exception in FloatSum.Intermediate", ee);
             }
         }
     }
@@ -73,42 +95,48 @@
         @Override
         public Double exec(Tuple input) throws IOException {
             try {
-                // Can't just call sum, because the intermediate results are
-                // now Doubles insteads of Floats.
-                DataBag values = (DataBag)input.get(0);
-        
-                // if we were handed an empty bag, return NULL
-                // this is in compliance with SQL standard
-                if(values.size() == 0) {
-                    return null;
-                }
-
-                double sum = 0;
-                boolean sawNonNull = false;
-                for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-                    Tuple t = (Tuple) it.next();
-                    try {
-                        Double d = (Double)(t.get(0));
-                        if (d == null) continue;
-                        sawNonNull = true;
-                        sum += d;
-                    }catch(RuntimeException exp) {
-                        throw WrappedIOException.wrap(
-                            "Caught exception in FloatSum.Final", exp);
-                    }
-                }
-        
-                
-                if(sawNonNull) {
-                    return new Double(sum);
-                } else {
-                    return null;
-                }
+                return sumDoubles(input);                
             } catch (ExecException ee) {
                 throw WrappedIOException.wrap("Caught exception in FloatSum.Final", ee);
             }
         }
     }
+    
+    static protected Double sumDoubles(Tuple input) throws ExecException {
+        // Can't just call sum, because the intermediate results are
+        // now Doubles insteads of Floats.
+        DataBag values = (DataBag)input.get(0);
+
+        // if we were handed an empty bag, return NULL
+        // this is in compliance with SQL standard
+        if(values.size() == 0) {
+            return null;
+        }
+
+        double sum = 0;
+        boolean sawNonNull = false;
+        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+            Tuple t = (Tuple) it.next();
+            try {
+                Double d = (Double)(t.get(0));
+                if (d == null) continue;
+                sawNonNull = true;
+                sum += d;
+            }catch(RuntimeException exp) {
+                ExecException newE =  new ExecException("Error processing: " +
+                        t.toString() + exp.getMessage(), exp);
+                    throw newE;
+            }
+        }
+
+        
+        if(sawNonNull) {
+            return new Double(sum);
+        } else {
+            return null;
+        }
+
+    }
 
     static protected  Double sum(Tuple input) throws ExecException {
         DataBag values = (DataBag)input.get(0);

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/IntAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/IntAvg.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/IntAvg.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/IntAvg.java Tue Dec 23 16:44:50 2008
@@ -66,7 +66,7 @@
     }
 
     public String getIntermed() {
-        return Intermed.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -76,13 +76,19 @@
     static public class Initial extends EvalFunc<Tuple> {
         @Override
         public Tuple exec(Tuple input) throws IOException {
+            
             try {
                 Tuple t = mTupleFactory.newTuple(2);
-                t.set(0, sum(input));
-                t.set(1, new Long(count(input)));
+                // input is a bag with one tuple containing
+                // the column we are trying to avg on
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+                Integer i = (Integer)tp.get(0);
+                t.set(0, i != null ? new Long(i): null);
+                t.set(1, 1L);
                 return t;
-            } catch(RuntimeException t) {
-                throw new RuntimeException(t.getMessage() + ": " + input);
+            } catch(RuntimeException e) {
+                throw new RuntimeException(e.getMessage() + ": " + input);
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();
                 oughtToBeEE.initCause(ee);
@@ -92,7 +98,7 @@
         }
     }
 
-    static public class Intermed extends EvalFunc<Tuple> {
+    static public class Intermediate extends EvalFunc<Tuple> {
         @Override
         public Tuple exec(Tuple input) throws IOException {
             try {
@@ -147,8 +153,15 @@
         for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
             Tuple t = it.next();
             Long l = (Long)t.get(0);
-            if(l == null) continue;
-            sawNonNull = true;
+            // we count nulls in avg as contributing 0
+            // a departure from SQL for performance of 
+            // COUNT() which implemented by just inspecting
+            // size of the bag
+            if(l == null) {
+                l = 0L;
+            } else {
+                sawNonNull = true;
+            }
             sum += l;
             count += (Long)t.get(1);
         }
@@ -180,6 +193,10 @@
             Tuple t = it.next();
             try {
                 Integer i = (Integer)(t.get(0));
+                // we count nulls in avg as contributing 0
+                // a departure from SQL for performance of 
+                // COUNT() which implemented by just inspecting
+                // size of the bag
                 if (i == null) continue;
                 sawNonNull = true;
                 sum += i;

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMax.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMax.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMax.java Tue Dec 23 16:44:50 2008
@@ -50,7 +50,7 @@
     }
 
     public String getIntermed() {
-        return Initial.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -63,6 +63,25 @@
         @Override
         public Tuple exec(Tuple input) throws IOException {
             try {
+                // input is a bag with one tuple containing
+                // the column we are trying to max on
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+                return tfact.newTuple((Integer)(tp.get(0)));
+            } catch (ExecException ee) {
+                IOException oughtToBeEE = new IOException();
+                oughtToBeEE.initCause(ee);
+                throw oughtToBeEE;
+            }
+        }
+    }
+
+    static public class Intermediate extends EvalFunc<Tuple> {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
                 return tfact.newTuple(max(input));
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMin.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMin.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMin.java Tue Dec 23 16:44:50 2008
@@ -52,7 +52,7 @@
     }
 
     public String getIntermed() {
-        return Initial.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -65,6 +65,25 @@
         @Override
         public Tuple exec(Tuple input) throws IOException {
             try {
+                // input is a bag with one tuple containing
+                // the column we are trying to min on
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+                return tfact.newTuple((Integer)(tp.get(0)));
+            } catch (ExecException ee) {
+                IOException oughtToBeEE = new IOException();
+                oughtToBeEE.initCause(ee);
+                throw oughtToBeEE;
+            }
+        }
+    }
+
+    static public class Intermediate extends EvalFunc<Tuple> {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
                 return tfact.newTuple(min(input));
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/IntSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/IntSum.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/IntSum.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/IntSum.java Tue Dec 23 16:44:50 2008
@@ -49,7 +49,7 @@
     }
 
     public String getIntermed() {
-        return Initial.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -61,10 +61,33 @@
 
         @Override
         public Tuple exec(Tuple input) throws IOException {
+            // Initial is called in the map - for SUM
+            // we just send the tuple down
             try {
-                return tfact.newTuple(sum(input));
+                // input is a bag with one tuple containing
+                // the column we are trying to sum
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+                Integer i = (Integer)tp.get(0);
+                return tfact.newTuple(i != null ? 
+                        new Long(i) : null);
+            }catch(NumberFormatException nfe){
+                // treat this particular input as null
+                return tfact.newTuple(null);
+            } catch (ExecException e) {
+                throw WrappedIOException.wrap("Caught exception in IntSum.Initial", e);
+            }
+        }
+    }
+    static public class Intermediate extends EvalFunc<Tuple> {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
+                return tfact.newTuple(sumLongs(input));
             } catch (ExecException ee) {
-                throw WrappedIOException.wrap("Caught exception in IntSum.Initial", ee);
+                throw WrappedIOException.wrap("Caught exception in IntSum.Intermediate", ee);
             }
         }
     }
@@ -72,43 +95,48 @@
         @Override
         public Long exec(Tuple input) throws IOException {
             try {
-                // Can't just call sum, because the intermediate results are
-                // now Longs insteads of Integers.
-                DataBag values = (DataBag)input.get(0);
-        
-                // if we were handed an empty bag, return NULL
-                // this is in compliance with SQL standard
-                if(values.size() == 0) {
-                    return null;
-                }
-
-                long sum = 0;
-                boolean sawNonNull = false;
-                for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-                    Tuple t = (Tuple) it.next();
-                    try {
-                        Long l = (Long)(t.get(0));
-                        if (l == null) continue;
-                        sawNonNull = true;
-                        sum += l;
-                    }catch(RuntimeException exp) {
-                        throw WrappedIOException.wrap(
-                            "Caught exception in IntSum.Final", exp);
-                    }
-                }
-        
-                
-                if(sawNonNull) {
-                    return new Long(sum);
-                } else {
-                    return null;
-                }
-            } catch (ExecException ee) {
-                throw WrappedIOException.wrap("Caught exception in IntSum.Final", ee);
+                return sumLongs(input);
+            } catch (ExecException e) {
+                 throw WrappedIOException.wrap("Caught exception in IntSum.Intermediate", e);
             }
         }
     }
 
+    static protected Long sumLongs(Tuple input) throws ExecException {
+        // Can't just call sum, because the intermediate results are
+        // now Longs insteads of Integers.
+        DataBag values = (DataBag)input.get(0);
+
+        // if we were handed an empty bag, return NULL
+        // this is in compliance with SQL standard
+        if(values.size() == 0) {
+            return null;
+        }
+
+        long sum = 0;
+        boolean sawNonNull = false;
+        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+            Tuple t = (Tuple) it.next();
+            try {
+                Long l = (Long)(t.get(0));
+                if (l == null) continue;
+                sawNonNull = true;
+                sum += l;
+            }catch(RuntimeException exp) {
+                throw new ExecException(
+                        "Error processing: " +
+                        t.toString() + exp.getMessage(), exp);
+            }
+        }
+
+        
+        if(sawNonNull) {
+            return new Long(sum);
+        } else {
+            return null;
+        }
+    }
+
     static protected  Long sum(Tuple input) throws ExecException {
         DataBag values = (DataBag)input.get(0);
         

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/LongAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/LongAvg.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/LongAvg.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/LongAvg.java Tue Dec 23 16:44:50 2008
@@ -66,7 +66,7 @@
     }
 
     public String getIntermed() {
-        return Intermed.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -78,8 +78,12 @@
         public Tuple exec(Tuple input) throws IOException {
             try {
                 Tuple t = mTupleFactory.newTuple(2);
-                t.set(0, sum(input));
-                t.set(1, new Long(count(input)));
+                // input is a bag with one tuple containing
+                // the column we are trying to avg on
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+                t.set(0, (Long)(tp.get(0)));
+                t.set(1, 1L);
                 return t;
             } catch(RuntimeException t) {
                 throw new RuntimeException(t.getMessage() + ": " + input);
@@ -92,7 +96,7 @@
         }
     }
 
-    static public class Intermed extends EvalFunc<Tuple> {
+    static public class Intermediate extends EvalFunc<Tuple> {
         @Override
         public Tuple exec(Tuple input) throws IOException {
             try {
@@ -147,8 +151,15 @@
         for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
             Tuple t = it.next();
             Long l = (Long)t.get(0);
-            if(l == null) continue;
-            sawNonNull = true;
+            // we count nulls in avg as contributing 0
+            // a departure from SQL for performance of 
+            // COUNT() which implemented by just inspecting
+            // size of the bag
+            if(l == null) {
+                l = 0L;
+            } else {
+                sawNonNull = true;
+            }
             sum += l;
             count += (Long)t.get(1);
         }

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMax.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMax.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMax.java Tue Dec 23 16:44:50 2008
@@ -50,7 +50,7 @@
     }
 
     public String getIntermed() {
-        return Initial.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -63,6 +63,25 @@
         @Override
         public Tuple exec(Tuple input) throws IOException {
             try {
+                // input is a bag with one tuple containing
+                // the column we are trying to max on
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+                return tfact.newTuple((Long)(tp.get(0)));
+            } catch (ExecException ee) {
+                IOException oughtToBeEE = new IOException();
+                oughtToBeEE.initCause(ee);
+                throw oughtToBeEE;
+            }
+        }
+    }
+
+    static public class Intermediate extends EvalFunc<Tuple> {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
                 return tfact.newTuple(max(input));
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMin.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMin.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMin.java Tue Dec 23 16:44:50 2008
@@ -50,7 +50,7 @@
     }
 
     public String getIntermed() {
-        return Initial.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -63,6 +63,25 @@
         @Override
         public Tuple exec(Tuple input) throws IOException {
             try {
+                // input is a bag with one tuple containing
+                // the column we are trying to min on
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+                return tfact.newTuple((Long)(tp.get(0)));
+            } catch (ExecException ee) {
+                IOException oughtToBeEE = new IOException();
+                oughtToBeEE.initCause(ee);
+                throw oughtToBeEE;
+            }
+        }
+    }
+
+    static public class Intermediate extends EvalFunc<Tuple> {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
                 return tfact.newTuple(min(input));
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/LongSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/LongSum.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/LongSum.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/LongSum.java Tue Dec 23 16:44:50 2008
@@ -28,6 +28,7 @@
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.WrappedIOException;
 
 
 /**
@@ -51,7 +52,7 @@
     }
 
     public String getIntermed() {
-        return Initial.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -63,10 +64,28 @@
 
         @Override
         public Tuple exec(Tuple input) throws IOException {
+            // Initial is called in the map - for SUM
+            // we just send the tuple down
+            try {
+                // input is a bag with one tuple containing
+                // the column we are trying to sum
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+                return tfact.newTuple( (Long)tp.get(0));
+            } catch (ExecException e) {
+                throw WrappedIOException.wrap("Caught exception in LongSum.Initial", e);
+            }
+        }
+    }
+    static public class Intermediate extends EvalFunc<Tuple> {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
             try {
                 return tfact.newTuple(sum(input));
             } catch (ExecException ee) {
-                IOException oughtToBeEE = new IOException();
+                IOException oughtToBeEE = new IOException("Caught exception in LongSum.Intermediate");
                 oughtToBeEE.initCause(ee);
                 throw oughtToBeEE;
             }
@@ -78,7 +97,7 @@
             try {
                 return sum(input);
             } catch (ExecException ee) {
-                IOException oughtToBeEE = new IOException();
+                IOException oughtToBeEE = new IOException("Caught exception in LongSum.Final");
                 oughtToBeEE.initCause(ee);
                 throw oughtToBeEE;
             }

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/MAX.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/MAX.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/MAX.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/MAX.java Tue Dec 23 16:44:50 2008
@@ -27,6 +27,7 @@
 import org.apache.pig.FuncSpec;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -56,7 +57,7 @@
     }
 
     public String getIntermed() {
-        return Initial.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -69,7 +70,30 @@
         @Override
         public Tuple exec(Tuple input) throws IOException {
             try {
-                return tfact.newTuple(max(input));
+                // input is a bag with one tuple containing
+                // the column we are trying to max on 
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+                DataByteArray dba = (DataByteArray)tp.get(0); 
+                return tfact.newTuple(dba != null ?
+                        Double.valueOf(dba.toString()): null);
+            } catch (NumberFormatException e) {
+                return tfact.newTuple(null);
+            } catch (ExecException ee) {
+                IOException oughtToBeEE = new IOException();
+                oughtToBeEE.initCause(ee);
+                throw oughtToBeEE;
+            }
+        }
+    }
+
+    static public class Intermediate extends EvalFunc<Tuple> {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
+                return tfact.newTuple(maxDoubles(input));
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();
                 oughtToBeEE.initCause(ee);
@@ -81,7 +105,7 @@
         @Override
         public Double exec(Tuple input) throws IOException {
             try {
-                return max(input);
+                return maxDoubles(input);
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();
                 oughtToBeEE.initCause(ee);
@@ -104,13 +128,49 @@
         for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
             Tuple t = it.next();
             try {
-                Double d = DataType.toDouble(t.get(0));
+                DataByteArray dba = (DataByteArray)t.get(0);
+                Double d = dba != null ? Double.valueOf(dba.toString()) : null;
+                if (d == null) continue;
+                sawNonNull = true;
+                curMax = java.lang.Math.max(curMax, d);
+            } catch (RuntimeException exp) {
+                ExecException newE = new ExecException("Error processing: " +
+                    t.toString() + exp.getMessage());
+                newE.initCause(exp);
+                throw newE;
+            }
+        }
+
+        if(sawNonNull) {
+            return new Double(curMax);
+        } else {
+            return null;
+        }
+    }
+    
+    // same as above function except all its inputs are 
+    // always Double - this should be used for better performance
+    // since we don't have to check the type of the object to
+    // decide it is a double. This should be used when the initial,
+    // intermediate and final versions are used.
+    static protected Double maxDoubles(Tuple input) throws ExecException {
+        DataBag values = (DataBag)input.get(0);
+        
+        // if we were handed an empty bag, return NULL
+        // this is in compliance with SQL standard
+        if(values.size() == 0) {
+            return null;
+        }
+
+        double curMax = Double.NEGATIVE_INFINITY;
+        boolean sawNonNull = false;
+        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+            Tuple t = it.next();
+            try {
+                Double d = (Double)t.get(0);
                 if (d == null) continue;
                 sawNonNull = true;
                 curMax = java.lang.Math.max(curMax, d);
-            }catch(NumberFormatException nfe){
-                // do nothing - essentially treat this
-                // particular input as null
             } catch (RuntimeException exp) {
                 ExecException newE = new ExecException("Error processing: " +
                     t.toString() + exp.getMessage());

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/MIN.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/MIN.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/MIN.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/MIN.java Tue Dec 23 16:44:50 2008
@@ -27,6 +27,7 @@
 import org.apache.pig.FuncSpec;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -56,7 +57,7 @@
     }
 
     public String getIntermed() {
-        return Initial.class.getName();
+        return Intermediate.class.getName();
     }
 
     public String getFinal() {
@@ -69,7 +70,31 @@
         @Override
         public Tuple exec(Tuple input) throws IOException {
             try {
-                return tfact.newTuple(min(input));
+                // input is a bag with one tuple containing
+                // the column we are trying to min on
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+                DataByteArray dba = (DataByteArray)tp.get(0); 
+                return tfact.newTuple(dba != null?
+                        Double.valueOf(dba.toString()) : null);
+            } catch (NumberFormatException e) {
+                // invalid input, send null
+                return tfact.newTuple(null);
+            } catch (ExecException ee) {
+                IOException oughtToBeEE = new IOException();
+                oughtToBeEE.initCause(ee);
+                throw oughtToBeEE;
+            }
+        }
+    }
+
+    static public class Intermediate extends EvalFunc<Tuple> {
+        private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
+                return tfact.newTuple(minDoubles(input));
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();
                 oughtToBeEE.initCause(ee);
@@ -81,7 +106,7 @@
         @Override
         public Double exec(Tuple input) throws IOException {
             try {
-                return min(input);
+                return minDoubles(input);
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();
                 oughtToBeEE.initCause(ee);
@@ -104,13 +129,49 @@
         for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
             Tuple t = it.next();
             try {
-                Double d = DataType.toDouble(t.get(0));
+                DataByteArray dba = (DataByteArray)t.get(0);
+                Double d = dba != null ? Double.valueOf(dba.toString()): null;
+                if (d == null) continue;
+                sawNonNull = true;
+                curMin = java.lang.Math.min(curMin, d);
+            } catch (RuntimeException exp) {
+                ExecException newE =  new ExecException("Error processing: " +
+                    t.toString() + exp.getMessage());
+                newE.initCause(exp);
+                throw newE;
+            }
+        }
+    
+        if(sawNonNull) {
+            return new Double(curMin);
+        } else {
+            return null;
+        }
+    }
+    
+    // same as above function except all its inputs are 
+    // always Double - this should be used for better performance
+    // since we don't have to check the type of the object to
+    // decide it is a double. This should be used when the initial,
+    // intermediate and final versions are used.
+    static protected Double minDoubles(Tuple input) throws ExecException {
+        DataBag values = (DataBag)input.get(0);
+        
+        // if we were handed an empty bag, return NULL
+        // this is in compliance with SQL standard
+        if(values.size() == 0) {
+            return null;
+        }
+
+        double curMin = Double.POSITIVE_INFINITY;
+        boolean sawNonNull = false;
+        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+            Tuple t = it.next();
+            try {
+                Double d = (Double)t.get(0);
                 if (d == null) continue;
                 sawNonNull = true;
                 curMin = java.lang.Math.min(curMin, d);
-            }catch(NumberFormatException nfe){
-                // do nothing - essentially treat this
-                // particular input as null
             } catch (RuntimeException exp) {
                 ExecException newE =  new ExecException("Error processing: " +
                     t.toString() + exp.getMessage());