You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/04/22 02:34:22 UTC

svn commit: r767337 [1/2] - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ src/org/apache/pig/backend/hadoop/executionengine...

Author: pradeepkth
Date: Wed Apr 22 00:34:21 2009
New Revision: 767337

URL: http://svn.apache.org/viewvc?rev=767337&view=rev
Log:
PIG-514:COUNT returns no results as a result of two filter statements in FOREACH (pradeepkth)

Added:
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestRelationToExprProject.java
    hadoop/pig/trunk/test/org/apache/pig/test/data/TestRelationToExprProjectInput.txt
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java
    hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestUnion.java
    hadoop/pig/trunk/test/org/apache/pig/test/Util.java
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/ComplexForeach.gld

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Apr 22 00:34:21 2009
@@ -42,6 +42,8 @@
 PIG-745: Add DataType.toString() to force basic types to chararray, useful
 for UDFs that want to handle all simple types as strings (ciemo via gates).
 
+PIG-514: COUNT returns no results as a result of two filter statements in
+FOREACH (pradeepkth)
 
 Release 0.2.0 - Unreleased
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Wed Apr 22 00:34:21 2009
@@ -864,8 +864,14 @@
     @Override
     public void visit(LOProject op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        POProject exprOp = new POProject(new OperatorKey(scope, nodeGen
+        POProject exprOp;
+        if(op.isSendEmptyBagOnEOP()) {
+            exprOp = new PORelationToExprProject(new OperatorKey(scope, nodeGen
                 .getNextNodeId(scope)), op.getRequestedParallelism());
+        } else {
+            exprOp = new POProject(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), op.getRequestedParallelism());
+        }
         exprOp.setResultType(op.getType());
         exprOp.setColumns((ArrayList)op.getProjection());
         exprOp.setStar(op.isStar());

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Wed Apr 22 00:34:21 2009
@@ -287,6 +287,17 @@
         return ret;
     }
 
+    /**
+     * Reset internal state in an operator.  For use in nested pipelines
+     * where operators like limit and sort may need to reset their state.
+     * Limit needs it because it needs to know it's seeing a fresh set of
+     * input.  Blocking operators like sort and distinct need it because they
+     * may not have drained their previous input due to a limit and thus need
+     * to be told to drop their old input and start over.
+     */
+    public void reset() {
+    }
+
     public static void setReporter(PigProgressable reporter) {
         PhysicalOperator.reporter = reporter;
     }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Wed Apr 22 00:34:21 2009
@@ -56,11 +56,13 @@
     private static final long serialVersionUID = 1L;
 
 	private static TupleFactory tupleFactory = TupleFactory.getInstance();
+	
+	protected static BagFactory bagFactory = BagFactory.getInstance();
 
 	private boolean resultSingleTupleBag = false;
 	
     //The column to project
-    ArrayList<Integer> columns;
+    protected ArrayList<Integer> columns;
     
     //True if we are in the middle of streaming tuples
     //in a bag
@@ -181,13 +183,21 @@
         Result res = processInputBag();
         if(res.returnStatus!=POStatus.STATUS_OK)
             return res;
-        DataBag inpBag = (DataBag) res.result;
+        return(consumeInputBag(res));
+    }
 
+    /**
+     * @param input
+     * @throws ExecException 
+     */
+    protected Result consumeInputBag(Result input) throws ExecException {
+        DataBag inpBag = (DataBag) input.result;
+        Result retVal = new Result();
         if(isInputAttached() || star){
-            res.result = inpBag;
-            res.returnStatus = POStatus.STATUS_OK;
+            retVal.result = inpBag;
+            retVal.returnStatus = POStatus.STATUS_OK;
             detachInput();
-            return res;
+            return retVal;
         }
         
         DataBag outBag;
@@ -201,7 +211,7 @@
                 tmpTuple.set(i, tuple.get(columns.get(i)));
             outBag = new SingleTupleBag(tmpTuple);
         } else {
-            outBag = BagFactory.getInstance().newDefaultBag();
+            outBag = bagFactory.newDefaultBag();
             for (Tuple tuple : inpBag) {
                 Tuple tmpTuple = tupleFactory.newTuple(columns.size());
                 for (int i = 0; i < columns.size(); i++)
@@ -209,9 +219,9 @@
                 outBag.add(tmpTuple);
             }
         }
-        res.result = outBag;
-        res.returnStatus = POStatus.STATUS_OK;
-        return res;
+        retVal.result = outBag;
+        retVal.returnStatus = POStatus.STATUS_OK;
+        return retVal;
     }
 
     @Override
@@ -374,7 +384,7 @@
         return clone;
     }
     
-    private Result processInputBag() throws ExecException {
+    protected Result processInputBag() throws ExecException {
         
         Result res = new Result();
         if (input==null && (inputs == null || inputs.size()==0)) {

Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java?rev=767337&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java Wed Apr 22 00:34:21 2009
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
+
+import java.util.ArrayList;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * Implements a specialized form of POProject which is
+ * used *ONLY* in the following case:
+ * This project is Project(*) introduced after a relational operator
+ * to supply a bag as output (as an expression). This project is either
+ * providing the bag as input to a successor expression operator or is 
+ * itself the leaf in a inner plan
+ * If the predecessor relational operator sends an EOP
+ * then send an empty bag first to signal "empty" output
+ * and then send an EOP
+
+ * NOTE: A Project(*) of return type BAG whose predecessor is
+ * from an outside plan (i.e. not in the same inner plan as the project)
+ * will NOT lead us here. So a query like:
+ * a = load 'baginp.txt' as (b:bag{t:tuple()}); b = foreach a generate $0; dump b;
+ * will go through a regular project (without the following flag)
+ */
+public class PORelationToExprProject extends POProject {
+    
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    boolean sendEmptyBagOnEOP = false;
+    
+    public PORelationToExprProject(OperatorKey k) {
+        this(k,-1,0);
+    }
+
+    public PORelationToExprProject(OperatorKey k, int rp) {
+        this(k, rp, 0);
+    }
+    
+    public PORelationToExprProject(OperatorKey k, int rp, int col) {
+        super(k, rp, col);
+    }
+
+    public PORelationToExprProject(OperatorKey k, int rp, ArrayList<Integer> cols) {
+        super(k, rp, cols);
+    }
+
+    @Override
+    public String name() {
+        
+        return "RelationToExpressionProject" + "[" + DataType.findTypeName(resultType) + "]" + ((star) ? "[*]" : columns) + " - " + mKey.toString();
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        // for now the specialization in this class
+        // does not affect the way visitors visit it - so
+        // we can just use visitProject()
+        v.visitProject(this);
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator#reset()
+     */
+    @Override
+    public void reset() {
+        // the foreach in which this operator is
+        // present is starting with a new set of inputs
+        // if we see an EOP from the predecessor *first*
+        // (.i.e we do not see any other input before and EOP
+        // and only see an EOP - this can happen if a Filter
+        // is the predecessor and it filters away all its input)
+        // we should send an empty bag. Set a flag which can be 
+        // checked if an EOP is encountered.
+        sendEmptyBagOnEOP = true;
+    }
+    
+    @Override
+    public Result getNext(DataBag db) throws ExecException {
+        Result input = processInputBag();
+        if(input.returnStatus!=POStatus.STATUS_OK) {
+            if(input.returnStatus == POStatus.STATUS_EOP && sendEmptyBagOnEOP)  {
+                // we received an EOP from the predecessor
+                // since the successor in the pipeline is
+                // expecting a bag, send an empty bag
+                input.result = bagFactory.newDefaultBag();
+                input.returnStatus = POStatus.STATUS_OK;
+                // we should send EOP the next time we are called
+                // if the foreach in which this operator is present
+                // calls this.getNext(bag) with new inputs then
+                // this flag will be reset in this.reset()
+            } else {
+                // since we are sending down some result (empty bag or otherwise)
+                // we should not be sending an empty bag on EOP any more UNLESS
+                // we are processing new inputs (see reset())
+                sendEmptyBagOnEOP = false;
+                return input;
+            }
+        }
+        Result r = consumeInputBag(input);
+        // since we are sending down some result (empty bag or otherwise)
+        // we should not be sending an empty bag on EOP any more UNLESS
+        // we are processing new inputs (see reset())
+        sendEmptyBagOnEOP = false;
+        return(r);
+    }
+       
+    @Override
+    public PORelationToExprProject clone() throws CloneNotSupportedException {
+        ArrayList<Integer> cols = new ArrayList<Integer>(columns.size());
+        // Can resuse the same Integer objects, as they are immutable
+        for (Integer i : columns) {
+            cols.add(i);
+        }
+        PORelationToExprProject clone = new PORelationToExprProject(new OperatorKey(mKey.scope, 
+            NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
+            requestedParallelism, cols);
+        clone.cloneHelper(this);
+        clone.star = star;
+        clone.overloaded = overloaded;
+        return clone;
+    }
+    
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Wed Apr 22 00:34:21 2009
@@ -38,8 +38,10 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.VisitorException;
@@ -53,6 +55,7 @@
     private static final long serialVersionUID = 1L;
     
     protected List<PhysicalPlan> inputPlans;
+    protected List<PhysicalOperator> opsToBeReset;
     protected Log log = LogFactory.getLog(getClass());
     protected static TupleFactory mTupleFactory = TupleFactory.getInstance();
     //Since the plan has a generate, this needs to be maintained
@@ -104,6 +107,7 @@
         super(k, rp);
         setUpFlattens(isToBeFlattened);
         this.inputPlans = inp;
+        opsToBeReset = new ArrayList<PhysicalOperator>();
         getLeaves();
     }
 
@@ -194,7 +198,10 @@
             }
             
             attachInputToPlans((Tuple) inp.result);
-            
+            for (PhysicalOperator po : opsToBeReset) {
+                po.reset();
+            }
+
             res = processPlan();
             
             processingPlan = true;
@@ -427,6 +434,18 @@
             noItems = 0;
             resultTypes = null;
         }
+        
+        if(inputPlans != null) {
+            for (PhysicalPlan pp : inputPlans) {
+                try {
+                    ResetFinder lf = new ResetFinder(pp, opsToBeReset);
+                    lf.visit();
+                } catch (VisitorException ve) {
+                    String errMsg = "Internal Error:  Unexpected error looking for nested operators which need to be reset in FOREACH";
+                    throw new RuntimeException(errMsg, ve);
+                }
+            }
+        }
     }
     
     public List<PhysicalPlan> getInputPlans() {
@@ -499,9 +518,15 @@
                 flattens.add(b);
             }
         }
+        
+        List<PhysicalOperator> ops = new ArrayList<PhysicalOperator>(opsToBeReset.size());
+        for (PhysicalOperator op : opsToBeReset) {
+            ops.add(op);
+        }
         POForEach clone = new POForEach(new OperatorKey(mKey.scope, 
                 NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
                 requestedParallelism, plans, flattens);
+        clone.setOpsToBeReset(ops);
         clone.setResultType(getResultType());
         return clone;
     }
@@ -522,4 +547,58 @@
             }
         }
     }
+
+    /**
+     * Visits a pipeline and calls reset on all the nodes.  Currently only
+     * pays attention to limit nodes, each of which need to be told to reset
+     * their limit.
+     */
+    private class ResetFinder extends PhyPlanVisitor {
+
+        ResetFinder(PhysicalPlan plan, List<PhysicalOperator> toBeReset) {
+            super(plan,
+                new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan));
+        }
+
+        @Override
+        public void visitDistinct(PODistinct d) throws VisitorException {
+            // FIXME: add only if limit is present
+            opsToBeReset.add(d);
+        }
+
+        @Override
+        public void visitLimit(POLimit limit) throws VisitorException {
+            opsToBeReset.add(limit);
+        }
+
+        @Override
+        public void visitSort(POSort sort) throws VisitorException {
+            // FIXME: add only if limit is present
+            opsToBeReset.add(sort);
+        }
+        
+        /* (non-Javadoc)
+         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject)
+         */
+        @Override
+        public void visitProject(POProject proj) throws VisitorException {
+            if(proj instanceof PORelationToExprProject) {
+                opsToBeReset.add(proj);
+            }
+        }
+    }
+
+    /**
+     * @return the opsToBeReset
+     */
+    public List<PhysicalOperator> getOpsToBeReset() {
+        return opsToBeReset;
+    }
+
+    /**
+     * @param opsToBeReset the opsToBeReset to set
+     */
+    public void setOpsToBeReset(List<PhysicalOperator> opsToBeReset) {
+        this.opsToBeReset = opsToBeReset;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java Wed Apr 22 00:34:21 2009
@@ -86,8 +86,11 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to avg
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                DataByteArray dba = (DataByteArray)tp.get(0); 
+                DataByteArray dba = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    dba = (DataByteArray)tp.get(0);
+                }
                 t.set(0, dba != null ? Double.valueOf(dba.toString()) : null);
                 t.set(1, 1L);
                 return t;

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java Wed Apr 22 00:34:21 2009
@@ -72,8 +72,10 @@
             // Since Initial is guaranteed to be called
             // only in the map, it will be called with an
             // input of a bag with a single tuple - the 
-            // count should always be 1.
-            return mTupleFactory.newTuple(new Long(1));
+            // count should always be 1 if bag is non empty
+            DataBag bag = (DataBag)input.get(0);
+            return mTupleFactory.newTuple(bag.iterator().hasNext()? 
+                    new Long(1) : new Long(0));
         }
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java Wed Apr 22 00:34:21 2009
@@ -83,8 +83,12 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to avg on
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                t.set(0, (Double)(tp.get(0)));
+                Double d = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    d = (Double)(tp.get(0));
+                }
+                t.set(0, d);
                 t.set(1, 1L);
                 return t;
             } catch (ExecException ee) {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java Wed Apr 22 00:34:21 2009
@@ -70,8 +70,12 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to max on
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                return tfact.newTuple((Double)(tp.get(0)));
+                Double d = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    d = (Double)(tp.get(0));
+                }
+                return tfact.newTuple(d);
             } catch (ExecException ee) {
                 throw ee;
             } catch (Exception e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java Wed Apr 22 00:34:21 2009
@@ -69,8 +69,12 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to min on
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                return tfact.newTuple((Double)(tp.get(0)));
+                Double d = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    d = (Double)(tp.get(0));
+                }
+                return tfact.newTuple(d);
             } catch (ExecException ee) {
                 throw ee;
             } catch (Exception e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java Wed Apr 22 00:34:21 2009
@@ -76,8 +76,12 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to sum
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                return tfact.newTuple((Double)( tp.get(0)));
+                Double d = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    d = (Double)( tp.get(0));
+                }
+                return tfact.newTuple(d);
             } catch (ExecException e) {
                 throw e;
             } catch (Exception e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java Wed Apr 22 00:34:21 2009
@@ -80,8 +80,11 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to avg on
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                Float f = (Float)(tp.get(0));
+                Float f = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    f = (Float)(tp.get(0));
+                }
                 t.set(0, f != null ? new Double(f) : null);
                 t.set(1, 1L);
                 return t;

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java Wed Apr 22 00:34:21 2009
@@ -69,8 +69,12 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to max on
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                return tfact.newTuple((Float)(tp.get(0)));
+                Float f = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    f = (Float)(tp.get(0));
+                }
+                return tfact.newTuple(f);
             } catch (ExecException ee) {
                 throw ee;
             } catch (Exception e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java Wed Apr 22 00:34:21 2009
@@ -69,8 +69,12 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to min on
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                return tfact.newTuple((Float)(tp.get(0)));
+                Float f = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    f = (Float)(tp.get(0));
+                }
+                return tfact.newTuple(f);
             } catch (ExecException ee) {
                 throw ee;
             } catch (Exception e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java Wed Apr 22 00:34:21 2009
@@ -73,10 +73,13 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to sum
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-				// send down a double since intermediate
-				// would  be sending a double
-                Float f = (Float)tp.get(0);
+                Float f = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    f = (Float)tp.get(0);
+                }
+                // send down a double since intermediate
+                // would  be sending a double
                 return tfact.newTuple(f != null ? 
                         new Double(f) : null);
             } catch (ExecException e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java Wed Apr 22 00:34:21 2009
@@ -81,8 +81,11 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to avg on
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                Integer i = (Integer)tp.get(0);
+                Integer i = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    i = (Integer)tp.get(0);
+                }
                 t.set(0, i != null ? new Long(i): null);
                 t.set(1, 1L);
                 return t;

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java Wed Apr 22 00:34:21 2009
@@ -69,8 +69,12 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to max on
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                return tfact.newTuple((Integer)(tp.get(0)));
+                Integer i = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    i = (Integer)(tp.get(0));
+                }
+                return tfact.newTuple(i);
             } catch (ExecException ee) {
                 throw ee;
             } catch (Exception e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java Wed Apr 22 00:34:21 2009
@@ -71,8 +71,12 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to min on
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                return tfact.newTuple((Integer)(tp.get(0)));
+                Integer i = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    i = (Integer)(tp.get(0));
+                }
+                return tfact.newTuple(i);
             } catch (ExecException ee) {
                 throw ee;
             } catch (Exception e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java Wed Apr 22 00:34:21 2009
@@ -72,8 +72,11 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to sum
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                Integer i = (Integer)tp.get(0);
+                Integer i = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    i = (Integer)tp.get(0);
+                }
                 return tfact.newTuple(i != null ? 
                         new Long(i) : null);
             }catch(NumberFormatException nfe){

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java Wed Apr 22 00:34:21 2009
@@ -80,8 +80,12 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to avg on
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                t.set(0, (Long)(tp.get(0)));
+                Long l = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    l = (Long)(tp.get(0));
+                }
+                t.set(0, l);
                 t.set(1, 1L);
                 return t;
             } catch (ExecException ee) {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java Wed Apr 22 00:34:21 2009
@@ -69,8 +69,12 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to max on
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                return tfact.newTuple((Long)(tp.get(0)));
+                Long l = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    l = (Long)(tp.get(0));
+                }
+                return tfact.newTuple(l);
             } catch (ExecException ee) {
                 throw ee;
             } catch (Exception e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java Wed Apr 22 00:34:21 2009
@@ -69,8 +69,12 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to min on
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                return tfact.newTuple((Long)(tp.get(0)));
+                Long l = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    l = (Long)(tp.get(0)); 
+                }
+                return tfact.newTuple(l);
             } catch (ExecException ee) {
                 throw ee;
             } catch (Exception e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java Wed Apr 22 00:34:21 2009
@@ -73,8 +73,12 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to sum
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                return tfact.newTuple( (Long)tp.get(0));
+                Long l = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    l = (Long)tp.get(0);
+                }
+                return tfact.newTuple(l);
             } catch (ExecException e) {
                 throw e;
             } catch (Exception e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java Wed Apr 22 00:34:21 2009
@@ -76,8 +76,11 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to max on 
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                DataByteArray dba = (DataByteArray)tp.get(0); 
+                DataByteArray dba = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    dba = (DataByteArray)tp.get(0);
+                }
                 return tfact.newTuple(dba != null ?
                         Double.valueOf(dba.toString()): null);
             } catch (NumberFormatException e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java Wed Apr 22 00:34:21 2009
@@ -76,8 +76,11 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to min on
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                DataByteArray dba = (DataByteArray)tp.get(0); 
+                DataByteArray dba = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    dba = (DataByteArray)tp.get(0);
+                }
                 return tfact.newTuple(dba != null?
                         Double.valueOf(dba.toString()) : null);
             } catch (NumberFormatException e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java Wed Apr 22 00:34:21 2009
@@ -77,8 +77,11 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to sum
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                DataByteArray dba = (DataByteArray)tp.get(0); 
+                DataByteArray dba = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    dba = (DataByteArray)tp.get(0); 
+                }
                 return tfact.newTuple(dba != null?
                         Double.valueOf(dba.toString()): null);
             }catch(NumberFormatException nfe){

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java Wed Apr 22 00:34:21 2009
@@ -65,8 +65,12 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to max on
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                return tfact.newTuple((String)(tp.get(0)));
+                String s = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    s = (String)(tp.get(0));
+                }
+                return tfact.newTuple(s);
             } catch (ExecException ee) {
                 throw ee;
             } catch (Exception e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java Wed Apr 22 00:34:21 2009
@@ -67,8 +67,12 @@
                 // input is a bag with one tuple containing
                 // the column we are trying to min on
                 DataBag bg = (DataBag) input.get(0);
-                Tuple tp = bg.iterator().next();
-                return tfact.newTuple((String)(tp.get(0)));
+                String s = null;
+                if(bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    s = (String)(tp.get(0));
+                }
+                return tfact.newTuple(s);
             } catch (ExecException ee) {
                 throw ee;
             } catch (Exception e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java Wed Apr 22 00:34:21 2009
@@ -36,7 +36,10 @@
  * will be done on this bag to disk.
  */
 public class NonSpillableDataBag implements DataBag {
-
+    // the reason this class does NOT extend DefaultAbstractBag
+    // is that we don't want to bloat this class with members it
+    // does not need (DefaultAbstractBag has many members related
+    // to spilling which are not needed here)
     /**
      * 
      */
@@ -180,6 +183,14 @@
             }
         }
     }
+    
+    /* (non-Javadoc)
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
+    @Override
+    public boolean equals(Object obj) {
+        return compareTo(obj) == 0;
+    }
 
     @Override
     public int compareTo(Object other) {

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java Wed Apr 22 00:34:21 2009
@@ -68,6 +68,8 @@
     private boolean mSentinel;
     private boolean mOverloaded = false;
 
+    private boolean sendEmptyBagOnEOP = false;
+
     /**
      * 
      * @param plan
@@ -460,4 +462,18 @@
         return clone;
     }
 
+    /**
+     * @param sendEmptyBagOnEOP the sendEmptyBagOnEOP to set
+     */
+    public void setSendEmptyBagOnEOP(boolean sendEmptyBagOnEOP) {
+        this.sendEmptyBagOnEOP = sendEmptyBagOnEOP;
+    }
+
+    /**
+     * @return the sendEmptyBagOnEOP
+     */
+    public boolean isSendEmptyBagOnEOP() {
+        return sendEmptyBagOnEOP;
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Wed Apr 22 00:34:21 2009
@@ -3405,6 +3405,18 @@
                 } else {
                     item = new LOProject(lp, new OperatorKey(scope, getNextId()), op, -1);
                     ((LOProject)item).setStar(true);
+                    // This project is Project(*) introduced after a relational operator
+                    // to supply a bag as output (as an expression). This project is either
+                    // providing the bag as input to a successor expression operator or is 
+                    // itself the leaf in a inner plan
+                    // If the predecessor relational operator sends an EOP
+                    // then send an empty bag first to signal "empty" output
+                    // and then send an EOP
+                    
+                    // A query like:
+                    // a = load 'baginp.txt' as (b:bag{t:tuple()}); b = foreach a generate $0; dump b;
+                    /// will go through a regular project (without the following flag)
+                    ((LOProject)item).setSendEmptyBagOnEOP(true);
                     log.debug("Set star to true");
                     item.setAlias(t1.image);
                 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java Wed Apr 22 00:34:21 2009
@@ -510,6 +510,132 @@
         }
     
     }
+    
+    /**
+     * Test the case where an empty bag is given as input to 
+     * the Initial function and the output is fed to Intermediate
+     * function whose output is fed to the Final function
+     * @throws Exception
+     */
+    @Test
+    public void testAggEmptyBagWithCombiner() throws Exception {
+        
+        for (String[] aggGroup : aggs) {
+            String[] aggFinalTypes = null; // will contains AVGFinal, DoubleAvgFinal etc
+            String[] aggInitialTypes = null; // will contains AVGInitial, DoubleAvgInitial etc
+            String[] aggIntermediateTypes = null; // will contains AVGIntermediate, DoubleAvgIntermediate etc
+            for (String stage: stages) {
+                String[] aggTypesArray = null;
+                if(stage.equals("Initial")) {
+                    aggInitialTypes = new String[aggGroup.length];
+                    aggTypesArray = aggInitialTypes;
+                } else if (stage.equals("Intermediate")) {
+                    aggIntermediateTypes = new String[aggGroup.length];
+                    aggTypesArray = aggIntermediateTypes;
+                } else  {// final 
+                    aggFinalTypes = new String[aggGroup.length];
+                    aggTypesArray = aggFinalTypes;
+                }
+
+                for (int i = 0; i < aggTypesArray.length; i++) {
+                    aggTypesArray[i] = aggGroup[i] + stage;
+                }
+            }
+            for(int k = 0; k < aggFinalTypes.length; k++) {
+                EvalFunc<?> aggInitial = evalFuncMap.get(aggInitialTypes[k]);
+                // To test this case, first <Agg>Initial is called with an empty bag
+                // as input. This is done in two ierations of 5 calls.
+                // The output from <Agg>Initial for the first half of inputs is
+                // put into one bag and the next half into another. Then these two
+                // bags are provided as inputs to two separate calls of <Agg>Intermediate.
+                // The outputs from the two calls to <Agg>Intermediate are put into a bag 
+                // and sent as input to <Agg>Final
+                
+                DataBag  intermediateInputBg1 = bagFactory.newDefaultBag();
+                DataBag  intermediateInputBg2 = bagFactory.newDefaultBag();
+                Tuple outputTuple = null;
+                for(int i = 0; i < 10; i++) {
+                    // create empty bag input to be provided as input
+                    // argument to the "Initial" function
+                    DataBag initialInputBg = bagFactory.newDefaultBag();
+                    Tuple initialInputTuple = tupleFactory.newTuple(initialInputBg);
+                    
+                    if(i < 5) {
+                        outputTuple = (Tuple)aggInitial.exec(initialInputTuple);
+                        // check that output is null for all aggs except COUNT
+                        // COUNT will give an output of 0 for empty bag input
+                        checkZeroOrNull(aggInitial, outputTuple.get(0));
+                        intermediateInputBg1.add(outputTuple);
+                    } else {
+                        outputTuple = (Tuple)aggInitial.exec(initialInputTuple);
+                        // check that output is null for all aggs except COUNT
+                        // COUNT will give an output of 0 for empty bag input
+                        checkZeroOrNull(aggInitial, outputTuple.get(0));
+                        intermediateInputBg2.add(outputTuple);
+                    }
+                }
+
+                EvalFunc<?> aggIntermediate = evalFuncMap.get(aggIntermediateTypes[k]);
+                DataBag finalInputBg = bagFactory.newDefaultBag();
+                Tuple intermediateInputTuple = tupleFactory.newTuple(intermediateInputBg1);
+                outputTuple = (Tuple)aggIntermediate.exec(intermediateInputTuple);
+                // check that output is null for all aggs except COUNT
+                // COUNT will give an output of 0 for empty bag input
+                checkZeroOrNull(aggIntermediate, outputTuple.get(0));
+                finalInputBg.add(outputTuple);
+                intermediateInputTuple = tupleFactory.newTuple(intermediateInputBg2);
+                outputTuple = (Tuple)aggIntermediate.exec(intermediateInputTuple);
+                // check that output is null for all aggs except COUNT
+                // COUNT will give an output of 0 for empty bag input
+                checkZeroOrNull(aggIntermediate, outputTuple.get(0));
+                finalInputBg.add(outputTuple);
+                
+                Tuple finalInputTuple = tupleFactory.newTuple(finalInputBg);
+                
+                EvalFunc<?> aggFinal = evalFuncMap.get(aggFinalTypes[k]);
+                Object output = aggFinal.exec(finalInputTuple);
+                // check that output is null for all aggs except COUNT
+                // COUNT will give an output of 0 for empty bag input
+                checkZeroOrNull(aggFinal, output);
+            }    
+        }
+    
+    }
+
+    /**
+     * Test the case where an empty bag is given as input to the non
+     * combiner version of aggregate functions
+     * @throws Exception if there are issues executing the aggregate function
+     */
+    @Test
+    public void testAggEmptyBag() throws Exception {
+        
+        for (String[] aggGroup : aggs) {
+            
+            for(int k = 0; k < aggGroup.length; k++) {
+                EvalFunc<?> agg = evalFuncMap.get(aggGroup[k]);
+
+                // call agg with empty bag as input
+                DataBag inputBag = bagFactory.newDefaultBag();
+                Tuple inputTuple = tupleFactory.newTuple(inputBag);
+                
+                Object output = agg.exec(inputTuple);
+                // check that output is null for all aggs except COUNT
+                // COUNT will give an output of 0 for empty bag input
+                checkZeroOrNull(agg, output);
+            }    
+        }
+    
+    }
+
+    private void checkZeroOrNull(EvalFunc<?> func, Object output) {
+        if(func.getClass().getName().contains("COUNT")) {
+            assertEquals(new Long(0), output);
+        } else {
+            assertEquals(null, output);
+        }
+    }
+
 
     // Builtin MATH Functions
     // =======================

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestRelationToExprProject.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestRelationToExprProject.java?rev=767337&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestRelationToExprProject.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestRelationToExprProject.java Wed Apr 22 00:34:21 2009
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.apache.pig.ExecType.MAPREDUCE;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.util.LogUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+/**
+ * Test PORelationToExprProject which is a special project
+ * introduced to handle the following case:
+ * This project is Project(*) introduced after a relational operator
+ * to supply a bag as output (as an expression). This project is either
+ * providing the bag as input to a successor expression operator or is 
+ * itself the leaf in a inner plan
+ * If the predecessor relational operator sends an EOP
+ * then send an empty bag first to signal "empty" output
+ * and then send an EOP
+
+ * NOTE: A Project(*) of return type BAG whose predecessor is
+ * from an outside plan (i.e. not in the same inner plan as the project)
+ * will NOT lead us here. So a query like:
+ * a = load 'baginp.txt' as (b:bag{t:tuple()}); b = foreach a generate $0; dump b;
+ * will go through a regular project (without the following flag)
+ */
+public class TestRelationToExprProject extends TestCase {
+
+    private MiniCluster cluster = MiniCluster.buildCluster();
+    private PigServer pigServer;
+    private static final String TEST_FILTER_COUNT3_INPUT="test/org/apache/pig/test/data/TestRelationToExprProjectInput.txt"; 
+    
+    /* (non-Javadoc)
+     * @see junit.framework.TestCase#setUp()
+     */
+    @Before
+    protected void setUp() throws Exception {
+        pigServer = new PigServer(MAPREDUCE, cluster.getProperties());
+    }
+    
+    /* (non-Javadoc)
+     * @see junit.framework.TestCase#tearDown()
+     */
+    @After
+    protected void tearDown() throws Exception {
+        pigServer.shutdown();
+    }
+    
+    // based on the script provided in the jira issue:PIG-514
+    // tests that when a filter inside a foreach filters away all tuples
+    // for a group, an empty bag is still provided to udfs whose
+    // input is the filter
+    @Test
+    public void testFilterCount1() throws IOException, ParseException {
+        
+        String[] inputData = new String[] {"1\t1\t3","1\t2\t3", "2\t1\t3", "2\t1\t3"};
+        Util.createInputFile(cluster, "test.txt", inputData);
+        String script = "test   = load 'test.txt' as (col1: int, col2: int, col3: int);" +
+        		"test2 = group test by col1;" +
+        		"test3 = foreach test2 {" +
+        		"        filter_one    = filter test by (col2==1);" +
+        		"        filter_notone = filter test by (col2!=1);" +
+        		"        generate group as col1, COUNT(filter_one) as cnt_one, COUNT(filter_notone) as cnt_notone;};";
+        Util.registerQuery(pigServer, script);
+        Iterator<Tuple> it = pigServer.openIterator("test3");
+        Tuple[] expected = new DefaultTuple[2];
+        expected[0] = (Tuple) Util.getPigConstant("(1,1L,1L)");
+        expected[1] = (Tuple) Util.getPigConstant("(2,2L,0L)");
+        Object[] results = new Object[2];
+        int i = 0;
+        while(it.hasNext()) {
+            if(i == 2) {
+                fail("Got more tuples than expected!");
+            }
+            Tuple t = it.next();
+            if(t.get(0).equals(1)) {
+                // this is the first tuple
+                results[0] = t;
+            } else {
+                results[1] = t;
+            }
+            i++;
+        }
+        for (int j = 0; j < expected.length; j++) {
+            assertTrue(expected[j].equals(results[j]));
+        }
+        Util.deleteFile(cluster, "test.txt");
+    }
+    
+    // based on jira PIG-710
+    // tests that when a filter inside a foreach filters away all tuples
+    // for a group, an empty bag is still provided to udfs whose
+    // input is the filter
+    @Test
+    public void testFilterCount2() throws IOException, ParseException {
+        Util.createInputFile(cluster, "filterbug.data", new String[] {
+                "a\thello" ,
+                "a\tgoodbye" ,
+                "b\tgoodbye" ,
+                "c\thello" ,
+                "c\thello" ,
+                "c\thello" ,
+                "d\twhat"
+        });
+        String query = "A = load 'filterbug.data' using PigStorage() as ( id:chararray, str:chararray );" +
+        		"B = group A by ( id );" +
+        		"Cfiltered = foreach B {" +
+        		"        D = filter A by (" +
+        		"                str matches 'hello'" +
+        		"                );" +
+        		"        matchedcount = COUNT(D);" +
+        		"        generate" +
+        		"                group," +
+        		"                matchedcount as matchedcount," +
+        		"                A.str;" +
+        		"        };";  
+        Util.registerQuery(pigServer, query);
+        Iterator<Tuple> it = pigServer.openIterator("Cfiltered");
+        Map<String, Tuple> expected = new HashMap<String, Tuple>();
+        expected.put("a", (Tuple) Util.getPigConstant("('a',1L,{('hello'),('goodbye')})"));
+        expected.put("b", (Tuple) Util.getPigConstant("('b',0L,{('goodbye')})"));
+        expected.put("c", (Tuple) Util.getPigConstant("('c',3L,{('hello'),('hello'),('hello')})"));
+        expected.put("d", (Tuple) Util.getPigConstant("('d',0L,{('what')})"));
+        int i = 0;
+        while(it.hasNext()) {
+            Tuple actual = it.next();
+            assertEquals(expected.get(actual.get(0)), actual);
+            i++;
+        }
+        assertEquals(4, i);
+        Util.deleteFile(cluster, "filterbug.data");
+    }
+    
+    // based on jira PIG-739
+    // tests that when a filter inside a foreach filters away all tuples
+    // for a group, an empty bag is still provided to udfs whose
+    // input is the filter
+    @Test
+    public void testFilterCount3() throws IOException, ParseException {
+        Util.copyFromLocalToCluster(cluster, TEST_FILTER_COUNT3_INPUT, "testdata");
+        String query = "TESTDATA =  load 'testdata' using PigStorage() as (timestamp:chararray, testid:chararray, userid: chararray, sessionid:chararray, value:long, flag:int);" +
+        		"TESTDATA_FILTERED = filter TESTDATA by (timestamp gte '1230800400000' and timestamp lt '1230804000000' and value != 0);" +
+        		"TESTDATA_GROUP = group TESTDATA_FILTERED by testid;" +
+        		"TESTDATA_AGG = foreach TESTDATA_GROUP {" +
+        		"                        A = filter TESTDATA_FILTERED by (userid eq sessionid);" +
+        		"                        C = distinct A.userid;" +
+        		"                        generate group as testid, COUNT(TESTDATA_FILTERED) as counttestdata, COUNT(C) as distcount, SUM(TESTDATA_FILTERED.flag) as total_flags;" +
+        		"                }" +
+        		"TESTDATA_AGG_1 = group TESTDATA_AGG ALL;" +
+        		"TESTDATA_AGG_2 = foreach TESTDATA_AGG_1 generate COUNT(TESTDATA_AGG);" ;
+        Util.registerQuery(pigServer, query);
+        Iterator<Tuple> it = pigServer.openIterator("TESTDATA_AGG_2");
+        
+        int i = 0;
+        while(it.hasNext()) {
+            Tuple actual = it.next();
+            assertEquals(20l, actual.get(0));
+            i++;
+        }
+        assertEquals(1, i);
+        Util.deleteFile(cluster, "testdata");
+    }
+    
+    // test case where RelationToExprProject is present in the
+    // single inner plan of foreach - this will test that it does
+    // send an EOP eventually for each input of the foreach
+    @Test
+    public void testFilter1() throws IOException, ParseException {
+        
+        String[] inputData = new String[] {"1\t1\t3","1\t2\t3", "2\t1\t3", "2\t1\t3", "3\t4\t4"};
+        Util.createInputFile(cluster, "test.txt", inputData);
+        String script = "test   = load 'test.txt' as (col1: int, col2: int, col3: int);" +
+                "test2 = group test by col1;" +
+                "test3 = foreach test2 {" +
+                "        filter_one    = filter test by (col2==1);" +
+                "        generate filter_one;};";
+        Util.registerQuery(pigServer, script);
+        Iterator<Tuple> it = pigServer.openIterator("test3");
+        Map<Tuple, Integer> expected = new HashMap<Tuple, Integer>();
+        expected.put((Tuple) Util.getPigConstant("({(1,1,3)})"), 0);
+        expected.put((Tuple) Util.getPigConstant("({(2,1,3),(2,1,3)})"), 0);
+        Tuple t = TupleFactory.getInstance().newTuple();
+        t.append(BagFactory.getInstance().newDefaultBag());
+        expected.put(t, 0);
+        int i = 0;
+        while(it.hasNext()) {
+            if(i == 3) {
+                fail("Got more tuples than expected!");
+            }
+            t = it.next();
+            assertTrue(expected.containsKey(t));
+            int occurences = expected.get(t);
+            occurences++;
+            expected.put(t, occurences);
+            i++;
+        }
+        for (Integer occurences : expected.values()) {
+            assertEquals(new Integer(1), occurences);
+        }
+        Util.deleteFile(cluster, "test.txt");
+    }
+    
+    // test case where RelationToExprProject is present in a
+    // different inner plan along with another plan to project the group
+    // in foreach - this will test that reset() correctly resets
+    // the state that empty bags need to be sent on EOP if no non-EOP
+    // input has been seen on a fresh input from foreach.
+    @Test
+    public void testFilter2() throws IOException, ParseException {
+        
+        String[] inputData = new String[] {"1\t1\t3","1\t2\t3", "2\t1\t3", "2\t1\t3", "3\t4\t4"};
+        Util.createInputFile(cluster, "test.txt", inputData);
+        String script = "test   = load 'test.txt' as (col1: int, col2: int, col3: int);" +
+                "test2 = group test by col1;" +
+                "test3 = foreach test2 {" +
+                "        filter_one    = filter test by (col2==1);" +
+                "        generate group, filter_one;};";
+        Util.registerQuery(pigServer, script);
+        Iterator<Tuple> it = pigServer.openIterator("test3");
+        Map<Tuple, Integer> expected = new HashMap<Tuple, Integer>();
+        expected.put((Tuple) Util.getPigConstant("(1,{(1,1,3)})"), 0);
+        expected.put((Tuple) Util.getPigConstant("(2,{(2,1,3),(2,1,3)})"), 0);
+        Tuple t = TupleFactory.getInstance().newTuple();
+        t.append(new Integer(3));
+        t.append(BagFactory.getInstance().newDefaultBag());
+        expected.put(t, 0);
+        int i = 0;
+        while(it.hasNext()) {
+            if(i == 3) {
+                fail("Got more tuples than expected!");
+            }
+            t = it.next();
+            assertTrue(expected.containsKey(t));
+            int occurences = expected.get(t);
+            occurences++;
+            expected.put(t, occurences);
+            i++;
+        }
+        for (Integer occurences : expected.values()) {
+            assertEquals(new Integer(1), occurences);
+        }
+        Util.deleteFile(cluster, "test.txt");
+    }
+
+    
+}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestUnion.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestUnion.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestUnion.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestUnion.java Wed Apr 22 00:34:21 2009
@@ -222,11 +222,12 @@
         Util.createInputFile(cluster, "input1.txt", new String[] {"dummy"});
         Util.createInputFile(cluster, "input2.txt", new String[] {"dummy"});
         PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-        Util.registerQuery(pig, "a = load 'input1.txt' ;" +
+        Util.registerQuery(pig, "a = load 'input1.txt';" +
         		"b = load 'input2.txt';" +
         		"c = foreach a generate 1, {(1, 'str1')};" +
         		"d = foreach b generate 2, {(2, 'str2')};" +
-        		"e = union c,d");
+        		"e = union c,d;" +
+        		"");
         Iterator<Tuple> it = pig.openIterator("e");
         Object[] expected = new Object[] { Util.getPigConstant("(1, {(1, 'str1')})"),
                 Util.getPigConstant("(2, {(2, 'str2')})")};

Modified: hadoop/pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/Util.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/Util.java Wed Apr 22 00:34:21 2009
@@ -17,16 +17,22 @@
  */
 package org.apache.pig.test;
 
+import static java.util.regex.Matcher.quoteReplacement;
+
+import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.OutputStreamWriter;
+import java.io.FileReader;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import static java.util.regex.Matcher.quoteReplacement;
+
 import junit.framework.Assert;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -38,13 +44,17 @@
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.data.*;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.parser.QueryParser;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.VisitorException;
 
 public class Util {
     private static BagFactory mBagFactory = BagFactory.getInstance();
@@ -254,6 +264,24 @@
 			Assert.assertEquals(expected, actual);
 		}
 	}
+
+	/**
+	 * Utility method to copy a file form local filesystem to the dfs on
+	 * the minicluster for testing in mapreduce mode
+	 * @param cluster a reference to the minicluster
+	 * @param localFileName the pathname of local file
+	 * @param fileNameOnCluster the name with which the file should be created on the minicluster
+	 * @throws IOException
+	 */
+	static public void copyFromLocalToCluster(MiniCluster cluster, String localFileName, String fileNameOnCluster) throws IOException {
+	    BufferedReader reader = new BufferedReader(new FileReader(localFileName));
+	    String line = null;
+	    List<String> contents = new ArrayList<String>();
+	    while((line = reader.readLine()) != null) {
+	        contents.add(line);
+	    }
+	    Util.createInputFile(cluster, fileNameOnCluster, contents.toArray(new String[0]));
+	}
 	
 	static public void printQueryOutput(Iterator<Tuple> actualResults, 
                Tuple[] expectedResults) {
@@ -342,9 +370,10 @@
     }
     
     public static void registerQuery(PigServer pigServer, String query) throws IOException {
-        String[] queryLines = query.split(";");
-        for (String line : queryLines) {
-            pigServer.registerQuery(line + ";");
-        }
+        File f = File.createTempFile("tmp", "");
+        PrintWriter pw = new PrintWriter(f);
+        pw.println(query);
+        pw.close();
+        pigServer.registerScript(f.getCanonicalPath());
     }
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/ComplexForeach.gld
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/ComplexForeach.gld?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/ComplexForeach.gld (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/ComplexForeach.gld Wed Apr 22 00:34:21 2009
@@ -1,6 +1,6 @@
 New For Each(false)[bag] - Test-Plan-Builder-22
 |   |
-|   Project[bag][*] - Test-Plan-Builder-21
+|   RelationToExpressionProject[bag][*] - Test-Plan-Builder-21
 |   |
 |   |---Filter[bag] - Test-Plan-Builder-17
 |       |   |