You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/01/27 22:57:22 UTC

svn commit: r738263 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ src/org/apache/pig/backend/hadoop/executionengine/phys...

Author: pradeepkth
Date: Tue Jan 27 21:57:21 2009
New Revision: 738263

URL: http://svn.apache.org/viewvc?rev=738263&view=rev
Log:
PIG-636: PERFORMANCE: Use lightweight bag implementations which do not register with SpillableMemoryManager with Combiner

Added:
    hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/SingleTupleBag.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestDataBagAccess.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=738263&r1=738262&r2=738263&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jan 27 21:57:21 2009
@@ -193,17 +193,17 @@
 
     PIG-422: cross is broken (shravanmn via olgan)
 
-    PIG-407: need to clone operators (pradeepk via olgan)
+    PIG-407: need to clone operators (pradeepkth via olgan)
 
     PIG-428: TypeCastInserter does not replace projects in inner plans
-    correctly (pradeepk vi olgan)
+    correctly (pradeepkth vi olgan)
 
     PIG-421: error with complex nested plan (sms via olgan)
     
     PIG-429: Self join wth implicit split has the join output in wrong order
-    (pradeepk via olgan)
+    (pradeepkth via olgan)
 
-    PIG-434: short-circuit AND and OR (pradeepk viia olgan)
+    PIG-434: short-circuit AND and OR (pradeepkth viia olgan)
 
     PIG-333: allowing no parethesis with single column alias with flatten (sms
     via olgan)
@@ -212,15 +212,15 @@
 
     PIG-426: Adding result of two UDFs gives a syntax error (sms via olgan)
 
-    PIG-436: alias is lost when single column is flattened (pradeepk via
+    PIG-436: alias is lost when single column is flattened (pradeepkth via
     olgan)
 
     PIG-364: Limit return incorrect records when we use multiple reducer
     (daijy via olgan)
 
-    PIG-439: disallow alias renaming (pardeepk via olgan)
+    PIG-439: disallow alias renaming (pradeepkth via olgan)
 
-    PIG-440: Exceptions from UDFs inside a foreach are not captured (pradeepk
+    PIG-440: Exceptions from UDFs inside a foreach are not captured (pradeepkth
     via olgan)
 
     PIG-442: Disambiguated alias after a foreach flatten is not accessible a
@@ -238,18 +238,18 @@
     PIG-445: Null Pointer Exceptions in the mappers leading to lot of retries
     (shravanmn via olgan)
 
-    PIG-444: job.jar is left behined (pradeepk via olgan)
+    PIG-444: job.jar is left behined (pradeepkth via olgan)
 
-    PIG-447: improved error messages (pradeepk via olgan)
+    PIG-447: improved error messages (pradeepkth via olgan)
 
-    PIG-448: explain broken after load with types (pradeepk via olgan)
+    PIG-448: explain broken after load with types (pradeepkth via olgan)
 
     PIG-380: invalid schema for databag constant (sms via olgan)
 
     PIG-451: If an field is part of group followed by flatten, then referring
-    to it causes a parse error (pradeepk via olgan)
+    to it causes a parse error (pradeepkth via olgan)
 
-    PIG-455: "group" alias is lost after a flatten(group) (pradeepk vi olgan)
+    PIG-455: "group" alias is lost after a flatten(group) (pradeepkth vi olgan)
 
     PIG-458: integration with Hadoop 18 (olgan)
 
@@ -262,34 +262,34 @@
     
     PIG-376: set job name (olgan)
 
-    PIG-463: POCast changes (pradeepk via olgan)
+    PIG-463: POCast changes (pradeepkth via olgan)
 
     PIG-427: casting input to UDFs
      
     PIG-437: as in alias names causing problems (sms via olgan)
 
-    PIG-54: MIN/MAX don't deal with invalid data (pradeepk via olgan)
+    PIG-54: MIN/MAX don't deal with invalid data (pradeepkth via olgan)
 
     PIG-470: TextLoader should produce bytearrays (sms via olgan)
 
     PIG-335: lineage (sms vi olgan)
 
-    PIG-464: bag schema definition (pradeepk via olgan)
+    PIG-464: bag schema definition (pradeepkth via olgan)
 
     PIG-457: report 100% on successful jobs only (shravanmn via olgan)
 
-    PIG-471: ignoring status errors from hadoop (pradeepk via olgan)
+    PIG-471: ignoring status errors from hadoop (pradeepkth via olgan)
 
-    PIG-465: performance improvement - removing keys from the value (pradeepk
+    PIG-465: performance improvement - removing keys from the value (pradeepkth
     via olgan)
     
     PIG-489: (*) processing (sms via olgan)
 
     PIG-475: missing heartbeats (shravanmn via olgan)
 
-    PIG-468: make determine Schema work for BinStorage (pradeepk via olgan)
+    PIG-468: make determine Schema work for BinStorage (pradeepkth via olgan)
 
-    PIG-494: invalid handling of UTF-8 data in PigStorage (pradeepk via olgan)
+    PIG-494: invalid handling of UTF-8 data in PigStorage (pradeepkth via olgan)
 
     PIG-501: Make branches/types work under cygwin (daijy via olgan)
 
@@ -304,15 +304,15 @@
 
     PIG-499: parser issue with as (sms via olgan)
 
-    PIG-507: permission error not reported (pradeepk via olgan)
+    PIG-507: permission error not reported (pradeepkth via olgan)
 
-    PIG-508: problem with double joins (pradeepk via olgan)
+    PIG-508: problem with double joins (pradeepkth via olgan)
 
-    PIG-497: problems with UTF8 handling in BinStorage (pradeepk via olgan)
+    PIG-497: problems with UTF8 handling in BinStorage (pradeepkth via olgan)
 
     PIG-505: working with map elements (sms via olgan)
 
-    PIG-517: load functiin with parameters does not work with cast (pradeepk
+    PIG-517: load functiin with parameters does not work with cast (pradeepkth
     via olgan)
 
     PIG-525: make sure cast for udf parameters works (olgan)
@@ -324,16 +324,16 @@
     PIG-527: allow PigStorage to write out complex output (sms via olgan)
 
     PIG-537: Failure in Hadoop map collect stage due to type mismatch in the
-    keys used in cogroup (pradeepk vi olgan)
+    keys used in cogroup (pradeepkth vi olgan)
 
-    PIG-538: support for null constants (pradeepk via olgan)
+    PIG-538: support for null constants (pradeepkth via olgan)
 
-    PIG-385: more null handling (pradeepk via olgan)
+    PIG-385: more null handling (pradeepkth via olgan)
 
     PIG-546: FilterFunc calls empty constructor when it should be calling
     parameterized constructor (sms via olgan)
 
-    PIG-449: Schemas for bags should contain tuples all the time (pradeepk via
+    PIG-449: Schemas for bags should contain tuples all the time (pradeepkth via
     olgan)
 
     PIG-501: make unit tests run under windows (daijy via olgan)
@@ -346,14 +346,14 @@
 
 	PIG-6: Add load support from hbase (hustlmsp via gates).
 
-    PIG-522: make negation work (pradeepk via olgan)
+    PIG-522: make negation work (pradeepkth via olgan)
 
-    PIG-563: support for multiple combiner invocations (pradeepk via olgan)
+    PIG-563: support for multiple combiner invocations (pradeepkth via olgan)
 
-    PIG-580: using combiner to compute distinct aggs (pradeepk via olgan)
+    PIG-580: using combiner to compute distinct aggs (pradeepkth via olgan)
 
     PIG-558: Distinct followed by a Join results in Invalid size 0 for a tuple
-    error (pradeepk via olgan)
+    error (pradeepkth via olgan)
 
 	PIG-572 A PigServer.registerScript() method, which lets a client
 	programmatically register a Pig Script.  (shubhamc via gates)
@@ -364,14 +364,14 @@
 
     PIG-597: Fix for how * is treated by UDFs (shravanmn via olgan)
 
-    PIG-629: performance improvement: getting rid of targeted tuple (pradeepk
+    PIG-629: performance improvement: getting rid of targeted tuple (pradeepkth
     via olgan)
 
     PIG-623: Fix spelling errors in output messages (tomwhite via sms)
 
     PIG-622: Include pig executable in distribution (tomwhite via sms)
 
-    PIG-628: misc performance improvements (pradeepk via olgan)
+    PIG-628: misc performance improvements (pradeepkth via olgan)
     
     PIG-589: error handling, phase 1-2 (sms via olgan)
 
@@ -380,6 +380,9 @@
     PIG-635: POCast.java has incorrect formatting (sms)
 
     PIG-634: When POUnion is one of the roots of a map plan, POUnion.getNext()
-    gives a null pointer exception (pradeepk)
+    gives a null pointer exception (pradeepkth)
 
     PIG-632: Improved error message for binary operators (sms)
+
+    PIG-636: Performance improvement: Use lightweight bag implementations which do not
+    register with SpillableMemoryManager with Combiner (pradeepkth)

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=738263&r1=738262&r2=738263&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Tue Jan 27 21:57:21 2009
@@ -466,6 +466,18 @@
                 
             }
         }
+        
+        // since we will only be creating SingleTupleBag as input to
+        // the map foreach, we should flag the POProjects in the map
+        // foreach inner plans to also use SingleTupleBag
+        for (PhysicalPlan mpl : mPlans) {
+            try {
+                new fixMapProjects(mpl).visit();
+            } catch (VisitorException e) {
+                throw new PlanException(e);
+            }
+        }
+
 
         // Set flattens for map and combiner ForEach to false
         List<Boolean> feFlattens = new ArrayList<Boolean>(cPlans.size());
@@ -823,7 +835,47 @@
         }
 
     }
+    
+    private class fixMapProjects extends PhyPlanVisitor {
+
+        public fixMapProjects(PhysicalPlan plan) {
+            this(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+                    plan));
+        }
 
+        /**
+         * @param plan
+         * @param walker
+         */
+        public fixMapProjects(PhysicalPlan plan,
+                PlanWalker<PhysicalOperator, PhysicalPlan> walker) {
+            super(plan, walker);
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject)
+         */
+        @Override
+        public void visitProject(POProject proj) throws VisitorException {
+            if (proj.getResultType() == DataType.BAG) {
+
+                // IMPORTANT ASSUMPTION:
+                // we should be calling this visitor only for
+                // fixing up the projects in the map's foreach
+                // inner plan. In the map side, we are dealing
+                // with single tuple bags - so set the flag in
+                // the project to use single tuple bags. If in
+                // future we don't have single tuple bags in the
+                // input to map's foreach, we should NOT be doing
+                // this!
+                proj.setResultSingleTupleBag(true);
+
+            }
+        }
+
+    }
 
     // Reset any member variables since we may have already visited one
     // combine.

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=738263&r1=738262&r2=738263&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Tue Jan 27 21:57:21 2009
@@ -26,6 +26,7 @@
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.SingleTupleBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -54,6 +55,8 @@
 
 	private static TupleFactory tupleFactory = TupleFactory.getInstance();
 
+	private boolean resultSingleTupleBag = false;
+	
     //The column to project
     ArrayList<Integer> columns;
     
@@ -168,6 +171,7 @@
 
     @Override
     public Result getNext(DataBag db) throws ExecException {
+        
         Result res = processInputBag();
         if(res.returnStatus!=POStatus.STATUS_OK)
             return res;
@@ -179,12 +183,25 @@
             detachInput();
             return res;
         }
-        DataBag outBag = BagFactory.getInstance().newDefaultBag();
-        for (Tuple tuple : inpBag) {
+        
+        DataBag outBag;
+        if(resultSingleTupleBag) {
+            // we have only one tuple in a bag - so create
+            // A SingleTupleBag for the result and fill it
+            // appropriately from the input bag
+            Tuple tuple = inpBag.iterator().next();
             Tuple tmpTuple = tupleFactory.newTuple(columns.size());
             for (int i = 0; i < columns.size(); i++)
                 tmpTuple.set(i, tuple.get(columns.get(i)));
-            outBag.add(tmpTuple);
+            outBag = new SingleTupleBag(tmpTuple);
+        } else {
+            outBag = BagFactory.getInstance().newDefaultBag();
+            for (Tuple tuple : inpBag) {
+                Tuple tmpTuple = tupleFactory.newTuple(columns.size());
+                for (int i = 0; i < columns.size(); i++)
+                    tmpTuple.set(i, tuple.get(columns.get(i)));
+                outBag.add(tmpTuple);
+            }
         }
         res.result = outBag;
         res.returnStatus = POStatus.STATUS_OK;
@@ -371,4 +388,8 @@
         }
     }
 
+    public void setResultSingleTupleBag(boolean resultSingleTupleBag) {
+        this.resultSingleTupleBag = resultSingleTupleBag;
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java?rev=738263&r1=738262&r2=738263&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java Tue Jan 27 21:57:21 2009
@@ -31,6 +31,7 @@
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.NonSpillableDataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.NullableTuple;
@@ -109,7 +110,7 @@
         //Create numInputs bags
         Object[] fields = new Object[mBags.length];
         for (int i = 0; i < mBags.length; i++) {
-            if (mBags[i]) fields[i] = mBagFactory.newDefaultBag();
+            if (mBags[i]) fields[i] = new NonSpillableDataBag();
         }
         
         // For each indexed tup in the inp, split them up and place their

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java?rev=738263&r1=738262&r2=738263&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java Tue Jan 27 21:57:21 2009
@@ -25,6 +25,7 @@
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.SingleTupleBag;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -165,7 +166,6 @@
                 resLst.add(res);
             }
             res.result = constructLROutput(resLst,(Tuple)inp.result);
-            
             return res;
         }
         return inp;
@@ -190,8 +190,7 @@
         // put the value in a bag so that the initial
         // version of the Algebraics will get a bag as
         // they would expect.
-        DataBag bg = mBagFactory.newDefaultBag();
-        bg.add(value);
+        DataBag bg = new SingleTupleBag(value);
         output.set(1, bg);
         return output;
     }

Added: hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java?rev=738263&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java Tue Jan 27 21:57:21 2009
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+
+
+/**
+ * An unordered collection of Tuples (possibly) with multiples.  The tuples
+ * are stored in an ArrayList, since there is no concern for order or
+ * distinctness. The implicit assumption is that the user of this class
+ * is storing only those many tuples as will fit in memory - no spilling
+ * will be done on this bag to disk.
+ */
+public class NonSpillableDataBag implements DataBag {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 
+     */
+    private List<Tuple> mContents;    
+
+    public NonSpillableDataBag() {
+        mContents = new ArrayList<Tuple>();
+    }
+
+    /**
+     * This constructor creates a bag out of an existing list
+     * of tuples by taking ownership of the list and NOT
+     * copying the contents of the list.
+     * @param listOfTuples List<Tuple> containing the tuples
+     */
+    public NonSpillableDataBag(List<Tuple> listOfTuples) {
+        mContents = listOfTuples;
+    }
+
+    public boolean isSorted() {
+        return false;
+    }
+    
+    public boolean isDistinct() {
+        return false;
+    }
+    
+    public Iterator<Tuple> iterator() {
+        return new NonSpillableDataBagIterator();
+    }
+
+    /**
+     * An iterator that handles getting the next tuple from the bag.
+     */
+    private class NonSpillableDataBagIterator implements Iterator<Tuple> {
+
+        private int mCntr = 0;
+
+        public boolean hasNext() { 
+            return (mCntr < mContents.size());
+        }
+
+        public Tuple next() {
+            // This will report progress every 1024 times through next.
+            // This should be much faster than using mod.
+            if ((mCntr & 0x3ff) == 0) reportProgress();
+
+            return mContents.get(mCntr++);
+        }
+
+        /**
+         * Not implemented.
+         */
+        public void remove() { throw new RuntimeException("Cannot remove() from NonSpillableDataBag.iterator()");}
+    }    
+
+    /**
+     * Report progress to HDFS.
+     */
+    protected void reportProgress() {
+        if (PhysicalOperator.reporter != null) {
+            PhysicalOperator.reporter.progress();
+        }
+    }
+
+    @Override
+    public void add(Tuple t) {
+        mContents.add(t);
+    }
+
+    @Override
+    public void addAll(DataBag b) {
+        for (Tuple t : b) {
+            mContents.add(t);
+        }
+    }
+
+    @Override
+    public void clear() {
+        mContents.clear();        
+    }
+
+    @Override
+    public void markStale(boolean stale) {
+        throw new RuntimeException("NonSpillableDataBag cannot be marked stale");
+    }
+
+    @Override
+    public long size() {
+        return mContents.size();
+    }
+
+    @Override
+    public long getMemorySize() {
+        return 0;
+    }
+
+    @Override
+    public long spill() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    /**
+     * Write a bag's contents to disk.
+     * @param out DataOutput to write data to.
+     * @throws IOException (passes it on from underlying calls).
+     */
+    public void write(DataOutput out) throws IOException {
+        // We don't care whether this bag was sorted or distinct because
+        // using the iterator to write it will guarantee those things come
+        // correctly.  And on the other end there'll be no reason to waste
+        // time re-sorting or re-applying distinct.
+        out.writeLong(size());
+        Iterator<Tuple> it = iterator();
+        while (it.hasNext()) {
+            Tuple item = it.next();
+            item.write(out);
+        }    
+    }
+ 
+    /**
+     * Read a bag from disk.
+     * @param in DataInput to read data from.
+     * @throws IOException (passes it on from underlying calls).
+     */
+    public void readFields(DataInput in) throws IOException {
+        long size = in.readLong();
+        
+        for (long i = 0; i < size; i++) {
+            try {
+                Object o = DataReaderWriter.readDatum(in);
+                add((Tuple)o);
+            } catch (ExecException ee) {
+                throw new RuntimeException(ee);
+            }
+        }
+    }
+
+    @Override
+    public int compareTo(Object other) {
+        if (this == other)
+            return 0;
+        if (other instanceof DataBag) {
+            DataBag bOther = (DataBag) other;
+            if (this.size() != bOther.size()) {
+                if (this.size() > bOther.size()) return 1;
+                else return -1;
+            }
+
+            Iterator<Tuple> thisIt = this.iterator();
+            Iterator<Tuple> otherIt = bOther.iterator();
+            while (thisIt.hasNext() && otherIt.hasNext()) {
+                Tuple thisT = thisIt.next();
+                Tuple otherT = otherIt.next();
+                
+                int c = thisT.compareTo(otherT);
+                if (c != 0) return c;
+            }
+            
+            return 0;   // if we got this far, they must be equal
+        } else {
+            return DataType.compare(this, other);
+        }
+    }
+    
+    /**
+     * Write the bag into a string. */
+    @Override
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append('{');
+        Iterator<Tuple> it = iterator();
+        while ( it.hasNext() ) {
+            Tuple t = it.next();
+            String s = t.toString();
+            sb.append(s);
+            if (it.hasNext()) sb.append(",");
+        }
+        sb.append('}');
+        return sb.toString();
+    }
+    
+}
+

Added: hadoop/pig/trunk/src/org/apache/pig/data/SingleTupleBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/SingleTupleBag.java?rev=738263&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/SingleTupleBag.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/SingleTupleBag.java Tue Jan 27 21:57:21 2009
@@ -0,0 +1,170 @@
+/**
+ * 
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * A simple performant implementation of the DataBag
+ * interface which only holds a single tuple. This will
+ * be used from POPreCombinerLocalRearrange and wherever else
+ * a single Tuple non-serializable DataBag is required.
+ */
+public class SingleTupleBag implements DataBag {
+    
+    Tuple item;
+
+    public SingleTupleBag(Tuple t) {
+        item = t;
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.DataBag#add(org.apache.pig.data.Tuple)
+     * NOTE: It is the user's responsibility to ensure only a single
+     * Tuple is ever added into a SingleTupleBag
+     */
+    @Override
+    public void add(Tuple t) {
+        item = t;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.DataBag#addAll(org.apache.pig.data.DataBag)
+     */
+    @Override
+    public void addAll(DataBag b) {
+        throw new RuntimeException("Cannot create SingleTupleBag from another DataBag");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.DataBag#clear()
+     */
+    @Override
+    public void clear() {
+        throw new RuntimeException("Cannot clear SingleTupleBag");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.DataBag#isDistinct()
+     */
+    @Override
+    public boolean isDistinct() {
+        return false;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.DataBag#isSorted()
+     */
+    @Override
+    public boolean isSorted() {
+        return false;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.DataBag#iterator()
+     */
+    @Override
+    public Iterator<Tuple> iterator() {
+        return new TBIterator();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.DataBag#markStale(boolean)
+     */
+    @Override
+    public void markStale(boolean stale) {
+        throw new RuntimeException("SingleTupleBag cannot be marked stale");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.DataBag#size()
+     */
+    @Override
+    public long size() {
+        return 1;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.util.Spillable#getMemorySize()
+     */
+    @Override
+    public long getMemorySize() {
+        return 0;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.util.Spillable#spill()
+     */
+    @Override
+    public long spill() {
+        return 0;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+     */
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        // TODO Auto-generated method stub
+        throw new IOException("SingleTupleBag should never be serialized or serialized");
+
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+     */
+    @Override
+    public void write(DataOutput out) throws IOException {
+        // TODO Auto-generated method stub
+        throw new IOException("SingleTupleBag should never be serialized or deserialized");
+    }
+
+    /* (non-Javadoc)
+     * @see java.lang.Comparable#compareTo(java.lang.Object)
+     */
+    @Override
+    public int compareTo(Object o) {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    class TBIterator implements Iterator<Tuple> {
+        boolean nextDone = false;
+        /* (non-Javadoc)
+         * @see java.util.Iterator#hasNext()
+         */
+        @Override
+        public boolean hasNext() {
+            return !nextDone;
+        }
+        
+        /* (non-Javadoc)
+         * @see java.util.Iterator#next()
+         */
+        @Override
+        public Tuple next() {
+            nextDone = true;
+            return item;
+            
+        }
+        
+        /* (non-Javadoc)
+         * @see java.util.Iterator#remove()
+         */
+        @Override
+        public void remove() {
+            throw new RuntimeException("SingleTupleBag.iterator().remove() is not allowed");    
+        }
+    }
+    
+    /**
+     * Write the bag into a string. */
+    @Override
+    public String toString() {
+        return "{" + item + "}";
+    }
+}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestDataBagAccess.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestDataBagAccess.java?rev=738263&r1=738262&r2=738263&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestDataBagAccess.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestDataBagAccess.java Tue Jan 27 21:57:21 2009
@@ -17,6 +17,9 @@
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.SingleTupleBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.MultiMap;
 import org.junit.Before;
@@ -39,6 +42,35 @@
     }
     
     @Test
+    public void testSingleTupleBagAcess() throws Exception {
+        Tuple inputTuple = new DefaultTuple();
+        inputTuple.append("a");
+        inputTuple.append("b");
+        
+        SingleTupleBag bg = new SingleTupleBag(inputTuple);
+        Iterator<Tuple> it = bg.iterator();
+        assertEquals(inputTuple, it.next());
+        assertFalse(it.hasNext());
+    }
+    
+    @Test
+    public void testNonSpillableDataBag() throws Exception {
+        String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
+        NonSpillableDataBag bg = new NonSpillableDataBag();
+        for (int i = 0; i < tupleContents.length; i++) {
+            bg.add(Util.createTuple(tupleContents[i]));
+        }
+        Iterator<Tuple> it = bg.iterator();
+        int j = 0;
+        while(it.hasNext()) {
+            Tuple t = it.next();
+            assertEquals(Util.createTuple(tupleContents[j]), t);
+            j++;
+        }
+        assertEquals(tupleContents.length, j);
+    }
+    
+    @Test
     public void testBagConstantAccess() throws IOException, ExecException {
         File input = Util.createInputFile("tmp", "", 
                 new String[] {"sampledata\tnot_used"});