You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/01/04 23:58:23 UTC

svn commit: r609048 [1/2] - in /incubator/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/builtin/ src/org/apache/pig/data/ src/org/apache/pig/impl/ src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/eval/ src/org/apache/pig/impl/eval/collec...

Author: gates
Date: Fri Jan  4 14:58:20 2008
New Revision: 609048

URL: http://svn.apache.org/viewvc?rev=609048&view=rev
Log:
PIG-30: Rewrote DataBags to better handle decisions of when to spill to
    disk and to spill more intelligently.


Added:
    incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
    incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
    incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
    incubator/pig/trunk/src/org/apache/pig/impl/util/PigLogger.java
    incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
    incubator/pig/trunk/test/org/apache/pig/test/TestDataBag.java
Removed:
    incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java
Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/src/org/apache/pig/PigServer.java
    incubator/pig/trunk/src/org/apache/pig/builtin/AVG.java
    incubator/pig/trunk/src/org/apache/pig/builtin/COUNT.java
    incubator/pig/trunk/src/org/apache/pig/builtin/DIFF.java
    incubator/pig/trunk/src/org/apache/pig/builtin/IsEmpty.java
    incubator/pig/trunk/src/org/apache/pig/builtin/MAX.java
    incubator/pig/trunk/src/org/apache/pig/builtin/MIN.java
    incubator/pig/trunk/src/org/apache/pig/builtin/SUM.java
    incubator/pig/trunk/src/org/apache/pig/data/BagFactory.java
    incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java
    incubator/pig/trunk/src/org/apache/pig/data/DataBag.java
    incubator/pig/trunk/src/org/apache/pig/data/DataMap.java
    incubator/pig/trunk/src/org/apache/pig/data/Datum.java
    incubator/pig/trunk/src/org/apache/pig/data/Tuple.java
    incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java
    incubator/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
    incubator/pig/trunk/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java
    incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java
    incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java
    incubator/pig/trunk/src/org/apache/pig/impl/io/PigFile.java
    incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
    incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigCombine.java
    incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java
    incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/IntermedResult.java
    incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POCogroup.java
    incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java
    incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PORead.java
    incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POSort.java
    incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POStore.java
    incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java
    incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
    incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
    incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
    incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java
    incubator/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
    incubator/pig/trunk/test/org/apache/pig/test/TestMapReduce.java
    incubator/pig/trunk/test/org/apache/pig/test/TestPigFile.java
    incubator/pig/trunk/test/org/apache/pig/test/Util.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Fri Jan  4 14:58:20 2008
@@ -58,3 +58,7 @@
     PIG-41: Added patterns to svn:ignore
 
     PIG-51: Fixed combiner in the presence of flattening
+
+	PIG-30: Rewrote DataBags to better handle decisions of when to spill to
+	disk and to spill more intelligently. (gates)
+

Modified: incubator/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/PigServer.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/PigServer.java Fri Jan  4 14:58:20 2008
@@ -57,6 +57,7 @@
 import org.apache.pig.impl.physicalLayer.PhysicalPlan;
 import org.apache.pig.impl.physicalLayer.POVisitor;
 import org.apache.pig.impl.physicalLayer.POPrinter;
+import org.apache.pig.impl.util.PigLogger;
 
 
 
@@ -172,7 +173,7 @@
             	logMessage += (logMessage + urls.nextElement() + "; ");
             }
             
-            pigContext.getLogger().debug(logMessage);
+            PigLogger.getLogger().debug(logMessage);
         }
     
         return resourceLocation;
@@ -302,7 +303,7 @@
        		pp = physicalPlans.get(readFrom);
     	}
     	
-    	return pp.exec(continueFromLast).content();
+    	return pp.exec(continueFromLast).iterator();
     	
     }
     
@@ -319,7 +320,7 @@
         readFrom.compile(queryResults);
         readFrom.exec();
         if (pigContext.getExecType() == ExecType.LOCAL)
-            return readFrom.read().content();
+            return readFrom.read().iterator();
         final LoadFunc p;
         
         try{
@@ -534,7 +535,7 @@
         stream.println("Logical Plan:");
         IntermedResult ir = queryResults.get(alias);
         if (ir == null) {
-            pigContext.getLogger().error("Invalid alias: " + alias);
+            PigLogger.getLogger().error("Invalid alias: " + alias);
             throw new IOException("Invalid alias: " + alias);
         }
 

Modified: incubator/pig/trunk/src/org/apache/pig/builtin/AVG.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/AVG.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/AVG.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/AVG.java Fri Jan  4 14:58:20 2008
@@ -106,7 +106,7 @@
         double sum = 0;
         double count = 0;
 
-        for (Iterator it = values.content(); it.hasNext();) {
+        for (Iterator it = values.iterator(); it.hasNext();) {
             Tuple t = (Tuple) it.next();
 //            if(!(t.getField(0) instanceof DataAtom)) {
 //                throw new RuntimeException("Unexpected Type: " + t.getField(0).getClass().getName() + " in " + t);
@@ -124,14 +124,14 @@
         DataBag values = input.getBagField(0);
 
         
-        return values.cardinality();
+        return values.size();
     }
 
     static protected double sum(Tuple input) throws IOException {
         DataBag values = input.getBagField(0);
 
         double sum = 0;
-        for (Iterator it = values.content(); it.hasNext();) {
+        for (Iterator it = values.iterator(); it.hasNext();) {
             Tuple t = (Tuple) it.next();
             sum += t.getAtomField(0).numval();
         }

Modified: incubator/pig/trunk/src/org/apache/pig/builtin/COUNT.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/COUNT.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/COUNT.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/COUNT.java Fri Jan  4 14:58:20 2008
@@ -77,7 +77,7 @@
     static protected long count(Tuple input) throws IOException {
         Datum values = input.getField(0);        
         if (values instanceof DataBag)
-        	return ((DataBag)values).cardinality();
+        	return ((DataBag)values).size();
         else if (values instanceof DataMap)
         	return ((DataMap)values).cardinality();
         else
@@ -87,7 +87,7 @@
     static protected long sum(Tuple input) throws IOException {
         DataBag values = input.getBagField(0);
         long sum = 0;
-        for (Iterator<Tuple> it = values.content(); it.hasNext();) {
+        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
             Tuple t = it.next();
             try {
                 sum += t.getAtomField(0).longVal();

Modified: incubator/pig/trunk/src/org/apache/pig/builtin/DIFF.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/DIFF.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/DIFF.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/DIFF.java Fri Jan  4 14:58:20 2008
@@ -48,9 +48,9 @@
         if (input.getField(0) instanceof DataBag) {
             DataBag field1 = input.getBagField(0);
             DataBag field2 = input.getBagField(1);
-            Iterator<Tuple> it1 = field1.content();
+            Iterator<Tuple> it1 = field1.iterator();
             checkInBag(field2, it1, output);
-            Iterator<Tuple> it2 = field2.content();
+            Iterator<Tuple> it2 = field2.iterator();
             checkInBag(field1, it2, output);
         } else {
             DataAtom d1 = input.getAtomField(0);
@@ -65,7 +65,7 @@
     private void checkInBag(DataBag bag, Iterator<Tuple> iterator, DataBag emitTo) throws IOException {
         while(iterator.hasNext()) {
             Tuple t = iterator.next();
-            Iterator<Tuple> it2 = bag.content();
+            Iterator<Tuple> it2 = bag.iterator();
             boolean found = false;
             while(it2.hasNext()) {
                 if (t.equals(it2.next())) {

Modified: incubator/pig/trunk/src/org/apache/pig/builtin/IsEmpty.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/IsEmpty.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/IsEmpty.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/IsEmpty.java Fri Jan  4 14:58:20 2008
@@ -32,7 +32,7 @@
     public boolean exec(Tuple input) throws IOException {
     	Datum values = input.getField(0);        
         if (values instanceof DataBag)
-        	return ((DataBag)values).cardinality() == 0;
+        	return ((DataBag)values).size() == 0;
         else if (values instanceof DataMap)
         	return ((DataMap)values).cardinality() == 0;
         else

Modified: incubator/pig/trunk/src/org/apache/pig/builtin/MAX.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/MAX.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/MAX.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/MAX.java Fri Jan  4 14:58:20 2008
@@ -68,7 +68,7 @@
 		DataBag values = input.getBagField(0);
 
 		double curMax = Double.NEGATIVE_INFINITY;
-		for (Iterator it = values.content(); it.hasNext();) {
+		for (Iterator it = values.iterator(); it.hasNext();) {
 			Tuple t = (Tuple) it.next();
 			try {
 				curMax = java.lang.Math.max(curMax, t.getAtomField(0).numval());

Modified: incubator/pig/trunk/src/org/apache/pig/builtin/MIN.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/MIN.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/MIN.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/MIN.java Fri Jan  4 14:58:20 2008
@@ -68,7 +68,7 @@
 		DataBag values = input.getBagField(0);
 
 		double curMin = Double.POSITIVE_INFINITY;
-		for (Iterator it = values.content(); it.hasNext();) {
+		for (Iterator it = values.iterator(); it.hasNext();) {
 			Tuple t = (Tuple) it.next();
 			try {
 				curMin = java.lang.Math.min(curMin, t.getAtomField(0).numval());

Modified: incubator/pig/trunk/src/org/apache/pig/builtin/SUM.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/SUM.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/SUM.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/SUM.java Fri Jan  4 14:58:20 2008
@@ -70,13 +70,13 @@
         double sum = 0;
 	int i = 0;
         Tuple t = null;
-        for (Iterator it = values.content(); it.hasNext();) {
+        for (Iterator it = values.iterator(); it.hasNext();) {
             try {
             t = (Tuple) it.next();
 	    i++;
             sum += t.getAtomField(0).numval();
             }catch(RuntimeException exp) {
-		String msg = "iteration = " + i + "bag size = " + values.cardinality() + " partial sum = " + sum + "\n";
+		String msg = "iteration = " + i + "bag size = " + values.size() + " partial sum = " + sum + "\n";
 		if (t != null)
 			msg += "previous tupple = " + t.toString();
 		throw new RuntimeException(exp.getMessage() + " additional info: " + msg);

Modified: incubator/pig/trunk/src/org/apache/pig/data/BagFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/BagFactory.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/BagFactory.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/BagFactory.java Fri Jan  4 14:58:20 2008
@@ -17,43 +17,58 @@
  */
 package org.apache.pig.data;
 
-import java.io.File;
-import java.io.IOException;
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.util.SpillableMemoryManager;
 
+/**
+ * A bag factory.  Can be used to generate different types of bags
+ * depending on what is needed.
+ */
 public class BagFactory {
+    private static BagFactory gSelf;
+    private static SpillableMemoryManager gMemMgr;
 
-    private File              tmpdir;
-    private static BagFactory instance = new BagFactory();
-
-    static{
-    	init(new File(System.getProperty("java.io.tmpdir")));
-    }
+    static { gSelf = new BagFactory(); }
+    
+    /**
+     * Get a reference to the singleton factory.
+     */
     public static BagFactory getInstance() {
-        return instance;
-    }
-
-    private BagFactory() {
-    }
-
-    public static void init(File tmpdir) {
-        instance.setTmpDir(tmpdir);
-    }
-
-    private void setTmpDir(File tmpdir) {
-        this.tmpdir = tmpdir;
-        this.tmpdir.mkdirs();
+        return gSelf;
     }
     
-    // Get BigBag or Bag, depending on whether the temp directory has been set up
-    public DataBag getNewBag() throws IOException {
-        if (tmpdir == null) return new DataBag();
-        else return getNewBigBag();
+    /**
+     * Get a default (unordered, not distinct) data bag.
+     */
+    public DataBag newDefaultBag() {
+        DataBag b = new DefaultDataBag();
+        gMemMgr.registerSpillable(b);
+        return b;
+    }
+
+    /**
+     * Get a sorted data bag.
+     * @param spec EvalSpec that controls how the data is sorted.
+     * If null, default comparator will be used.
+     */
+    public DataBag newSortedBag(EvalSpec spec) {
+        DataBag b = new SortedDataBag(spec);
+        gMemMgr.registerSpillable(b);
+        return b;
     }
     
-    // Need a Big Bag, dammit!
-    public BigDataBag getNewBigBag() throws IOException {
-        if (tmpdir == null) throw new IOException("No temp directory given for BigDataBag.");
-        else return new BigDataBag(tmpdir);
+    /**
+     * Get a distinct data bag.
+     */
+    public DataBag newDistinctBag() {
+        DataBag b = new DistinctDataBag();
+        gMemMgr.registerSpillable(b);
+        return b;
+    }
+
+    private BagFactory() {
+        gMemMgr = new SpillableMemoryManager();
     }
 
 }
+

Modified: incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java Fri Jan  4 14:58:20 2008
@@ -153,4 +153,13 @@
         return stringVal.hashCode();
     }
 
+    @Override
+    public long getMemorySize() {
+        long used = 0;
+        if (stringVal != null) used += stringVal.length() * 2 + OBJECT_SIZE;
+        if (doubleVal != null) used += 8 + OBJECT_SIZE;
+        if (binaryVal != null) used += binaryVal.length + OBJECT_SIZE;
+        used += OBJECT_SIZE + 3 * REF_SIZE;
+        return used;
+     }
 }

Modified: incubator/pig/trunk/src/org/apache/pig/data/DataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DataBag.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DataBag.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/DataBag.java Fri Jan  4 14:58:20 2008
@@ -17,263 +17,308 @@
  */
 package org.apache.pig.data;
 
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
 import java.util.Iterator;
-import java.util.List;
+import java.util.ArrayList;
 
-import org.apache.pig.impl.eval.EvalSpec;
-import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.util.Spillable;
 import org.apache.pig.impl.mapreduceExec.PigMapReduce;
 
-
 /**
  * A collection of Tuples
  */
-public class DataBag extends Datum{
-    protected List<Tuple> content;
-    protected boolean isSorted = false;
-    
-    public DataBag() {
-        content = new ArrayList<Tuple>();
-    }
+public abstract class DataBag extends Datum implements Spillable {
+    // Container that holds the tuples. Actual object instantiated by
+    // subclasses.
+    protected Collection<Tuple> mContents;
 
-    public DataBag(List<Tuple> c) {
-        content = c;
-    }
+    // Spill files we've created.  These need to be removed in finalize.
+    protected ArrayList<File> mSpillFiles;
 
-    public DataBag(Tuple t) {
-        content = new ArrayList<Tuple>();
-        content.add(t);
-    }
+    // Total size, including tuples on disk.  Stored here so we don't have
+    // to run through the disk when people ask.
+    protected long mSize = 0;
 
-    public int cardinality() {
-        return content.size();
-    }
+    protected boolean mMemSizeChanged = false;
 
-    public boolean isEmpty() {
-        return content.size() == 0;
-    }
-    
-    public int compareTo(Object other) {
-    	if (this == other)
-    		return 0;
-    	if (other instanceof DataAtom) return +1;
-        if (other instanceof Tuple) return -1;
-        if (other instanceof DataBag){
-        	DataBag bOther = (DataBag) other;
-	        if (this.cardinality() != bOther.cardinality()) {
-	            return (this.cardinality() - bOther.cardinality());
-	        }
-	        
-	        // same cardinality, so compare tuple by tuple ...
-	        if (!isSorted())
-	        	this.sort();
-	        if (!bOther.isSorted())
-	        	bOther.sort();
-	        
-	        Iterator<Tuple> thisIt = this.content();
-	        Iterator<Tuple> otherIt = bOther.content();
-	        while (thisIt.hasNext() && otherIt.hasNext()) {
-	            Tuple thisT = thisIt.next();
-	            Tuple otherT = otherIt.next();
-	            
-	            int c = thisT.compareTo(otherT);
-	            if (c != 0) return c;
-	        }
-	        
-	        return 0;   // if we got this far, they must be equal
-        }else{
-        	return -1;
-        }
-	        	
-    }
-    
-    @Override
-	public boolean equals(Object other) {
-        return (compareTo(other) == 0);
+    protected long mMemSize = 0;
+
+    /**
+     * Get the number of elements in the bag, both in memory and on disk.
+     */
+    public long size() {
+        return mSize;
     }
-    
-    public void sort() {
-        Collections.sort(content);
+
+    /**
+     * Deprecated.  Use size() instead.
+     */
+    public int cardinality() {
+        return (int)size();
     }
     
-    public void sort(EvalSpec spec) {
-        Collections.sort(content, spec.getComparator());
-        isSorted = true;
-    }
+    /**
+     * Find out if the bag is sorted.
+     */
+    public abstract boolean isSorted();
     
-    public void arrange(EvalSpec spec) {
-        sort(spec);
-        isSorted = true;
-    }
+    /**
+     * Find out if the bag is distinct.
+     */
+    public abstract boolean isDistinct();
     
-    public void distinct() {
-        
-        Collections.sort(content);
-        isSorted = true;
-        int curCall = 0;
-        
-        Tuple lastTup = null;
-        for (Iterator<Tuple> it = content.iterator(); it.hasNext(); ) {
-            if (curCall < notifyInterval - 1)
-                curCall++;
-            else
-            {
-                    if (PigMapReduce.reporter != null)
-                        PigMapReduce.reporter.progress(); 
-                    curCall = 0;
-            }
-            Tuple thisTup = it.next();
-            
-            if (lastTup == null) {
-                lastTup = thisTup;
-                continue;
-            }
-            
-            if (thisTup.compareTo(lastTup) == 0) {
-                it.remove();
-            } else {
-                lastTup = thisTup;
-            }
-        }
-    }
-
-    public static int notifyInterval = 1000;
-    public int numNotifies; // used for unit tests only
+    /**
+     * Get an iterator to the bag. For default and distinct bags,
+     * no particular order is guaranteed. For sorted bags the order
+     * is guaranteed to be sorted according
+     * to the provided comparator.
+     */
+    public abstract Iterator<Tuple> iterator();
 
+    /**
+     * Deprected.  Use iterator() instead.
+     */
+    @Deprecated
     public Iterator<Tuple> content() {
-        return new Iterator<Tuple>() {
-             Iterator<Tuple> myIt;
-             int curCall;
-
-            {
-                numNotifies = 0;
-                myIt = content.iterator();
-
-            }
-            public final boolean hasNext(){
-                return myIt.hasNext();
-            }
-            public final Tuple next(){
-                if (curCall < notifyInterval - 1)
-                    curCall ++;
-                else{
-                    if (PigMapReduce.reporter != null)
-                        PigMapReduce.reporter.progress(); 
-                    numNotifies ++;
-                    curCall = 0;
-                } 
-                return myIt.next();
-            }
-            public final void remove(){
-                myIt.remove();
-            }
-        };
+        return iterator();
     }
     
-
+    /**
+     * Add a tuple to the bag.
+     * @param t tuple to add.
+     */
     public void add(Tuple t) {
-        if (t!=null)
-        	content.add(t);
+        synchronized (mContents) {
+            mMemSizeChanged = true;
+            mSize++;
+            mContents.add(t);
+        }
     }
 
+    /** * Add contents of a bag to the bag.
+     * @param b bag to add contents of.
+     */
     public void addAll(DataBag b) {
-        
-        Iterator<Tuple> it = b.content();
-        while (it.hasNext()) {
-            add(it.next());
+        synchronized (mContents) {
+            mMemSizeChanged = true;
+            mSize += b.size();
+            Iterator<Tuple> i = b.iterator();
+            while (i.hasNext()) mContents.add(i.next());
         }
     }
 
-    public void remove(Tuple d) {
-        content.remove(d);
-    }
-
+    // Do I need remove? I couldn't find it used anywhere.
+    
     /**
-     * Returns the value of field i. Since there may be more than one tuple in the bag, this
-     * function throws an exception if it is not the case that all tuples agree on this field
+     * Return the size of memory usage.
      */
-    public DataAtom getField(int i) throws IOException {
-        DataAtom val = null;
+    @Override
+    public long getMemorySize() {
+        if (!mMemSizeChanged) return mMemSize;
+
+        long used = 0;
+        // I can't afford to talk through all the tuples every time the
+        // memory manager wants to know if it's time to dump.  Just sample
+        // the first 100 and see what we get.  This may not be 100%
+        // accurate, but it's just an estimate anyway.
+        int j;
+        int numInMem = 0;
+        synchronized (mContents) {
+            numInMem = mContents.size();
+            // Measure only what's in memory, not what's on disk.
+            Iterator<Tuple> i = mContents.iterator();
+            for (j = 0; i.hasNext() && j < 100; j++) { 
+                used += i.next().getMemorySize();
+                used += REF_SIZE;
+            }
+        }
+
+        if (numInMem > 100) {
+            // Estimate the per tuple size.  Do it in integer arithmetic
+            // (even though it will be slightly less accurate) for speed.
+            used /= j;
+            used *= numInMem;
+        }
 
-        for (Iterator<Tuple> it = content(); it.hasNext();) {
-            DataAtom currentVal = it.next().getAtomField(i);
+        mMemSize = used;
+        mMemSizeChanged = false;
+        return used;
+    }
 
-            if (val == null) {
-                val = currentVal;
-            } else {
-                if (!val.strval().equals(currentVal.strval()))
-                    throw new IOException("Cannot call getField on a databag unless all tuples agree.");
+    /**
+     * Clear out the contents of the bag, both on disk and in memory.
+     * Any attempts to read after this is called will produce undefined
+     * results.
+     */
+    public void clear() {
+        synchronized (mContents) {
+            mContents.clear();
+            if (mSpillFiles != null) {
+                for (int i = 0; i < mSpillFiles.size(); i++) {
+                    mSpillFiles.get(i).delete();
+                }
+                mSpillFiles.clear();
             }
+            mSize = 0;
         }
+    }
 
-        if (val == null)
-            throw new IOException("Cannot call getField on an empty databag.");
+    public int compareTo(Object other) {
+        // Do we really need to be able to compare to DataAtom and Tuple?
+        // When does that happen?
+        if (this == other)
+            return 0;
+        if (other instanceof DataBag){
+            DataBag bOther = (DataBag) other;
+            if (this.size() != bOther.size()) {
+                if (this.size() > bOther.size()) return 1;
+                else return -1;
+            }
 
-        return val;
+            // Don't sort them, just go tuple by tuple.
+            Iterator<Tuple> thisIt = this.iterator();
+            Iterator<Tuple> otherIt = bOther.iterator();
+            while (thisIt.hasNext() && otherIt.hasNext()) {
+                Tuple thisT = thisIt.next();
+                Tuple otherT = otherIt.next();
+                
+                int c = thisT.compareTo(otherT);
+                if (c != 0) return c;
+            }
+            
+            return 0;   // if we got this far, they must be equal
+        } else if (other instanceof DataAtom) {
+            return +1;
+        } else if (other instanceof Tuple) {
+            return -1;
+        } else {
+            return -1;
+        }
     }
 
-    public void clear(){
-    	content.clear();
-    	isSorted = false;
+    public boolean equals(Object other) {
+        return compareTo(other) == 0;
     }
 
-    
+    /**
+     * Write a bag's contents to disk.
+     * @param out DataOutput to write data to.
+     * @throws IOException (passes it on from underlying calls).
+     */
     @Override
-	public void write(DataOutput out) throws IOException {
-    	 out.write(BAG);
-         Tuple.encodeInt(out, cardinality());
-         Iterator<Tuple> it = content();
-         while (it.hasNext()) {
-             Tuple item = it.next();
-             item.write(out);
-         }	
+    public void write(DataOutput out) throws IOException {
+        // We don't care whether this bag was sorted or distinct because
+        // using the iterator to write it will guarantee those things come
+        // correctly.  And on the other end there'll be no reason to waste
+        // time re-sorting or re-applying distinct.
+        out.write(BAG);
+        out.writeLong(size());
+        Iterator<Tuple> it = iterator();
+        while (it.hasNext()) {
+            Tuple item = it.next();
+            item.write(out);
+        }    
     }
-    
-    public static abstract class BagDelimiterTuple extends Tuple{}
-    public static class StartBag extends BagDelimiterTuple{}
-    
-    public static class EndBag extends BagDelimiterTuple{}
-    
-    public static final Tuple startBag = new StartBag();
-    public static final Tuple endBag = new EndBag();
-    
+ 
+    /**
+     * Read a bag from disk.
+     * @param in DataInput to read data from.
+     * @throws IOException (passes it on from underlying calls).
+     */
     static DataBag read(DataInput in) throws IOException {
-        int size = Tuple.decodeInt(in);
-        DataBag ret = BagFactory.getInstance().getNewBag();
+        long size = in.readLong();
+        // Always use a default data bag, as if it was sorted or distinct
+        // we're guaranteed it was written out that way already, and we
+        // don't need to mess with it.
+        DataBag ret = BagFactory.getInstance().newDefaultBag();
         
-        for (int i = 0; i < size; i++) {
+        for (long i = 0; i < size; i++) {
             Tuple t = new Tuple();
             t.readFields(in);
             ret.add(t);
         }
         return ret;
     }
-    
-    public void markStale(boolean stale){}
-    
+ 
+    /**
+     * This is used by FuncEvalSpec.FakeDataBag.
+     * @param stale Set stale state.
+     */
+    public void markStale(boolean stale)
+    {
+    }
+
+    /**
+     * Write the bag into a string. */
     @Override
-	public String toString() {
+    public String toString() {
         StringBuffer sb = new StringBuffer();
         sb.append('{');
-        Iterator<Tuple> it = content();
+        Iterator<Tuple> it = iterator();
         while ( it.hasNext() ) {
-        	Tuple t = it.next();
-        	String s = t.toString();
-        	sb.append(s);
-            if (it.hasNext())
-                sb.append(", ");
+            Tuple t = it.next();
+            String s = t.toString();
+            sb.append(s);
+            if (it.hasNext()) sb.append(", ");
         }
         sb.append('}');
         return sb.toString();
     }
-    
-    public boolean isSorted(){
-    	return isSorted;
+
+    /**
+     * Need to override finalize to clean out the mSpillFiles array.
+     */
+    @Override
+    protected void finalize() {
+        if (mSpillFiles != null) {
+            for (int i = 0; i < mSpillFiles.size(); i++) {
+                mSpillFiles.get(i).delete();
+            }
+        }
     }
+
+    /**
+     * Get a file to spill contents to.  The file will be registered in the
+     * mSpillFiles array.
+     * @return stream to write tuples to.
+     */
+    protected DataOutputStream getSpillFile() throws IOException {
+        if (mSpillFiles == null) {
+            // We want to keep the list as small as possible.
+            mSpillFiles = new ArrayList<File>(1);
+        }
+
+        File f = File.createTempFile("pigbag", null);
+        f.deleteOnExit();
+        mSpillFiles.add(f);
+        return new DataOutputStream(new BufferedOutputStream(
+            new FileOutputStream(f)));
+    }
+
+    /**
+     * Report progress to HDFS.
+     */
+    protected void reportProgress() {
+        if (PigMapReduce.reporter != null) {
+            PigMapReduce.reporter.progress();
+        }
+    }
+
+    public static abstract class BagDelimiterTuple extends Tuple{}
+    public static class StartBag extends BagDelimiterTuple{}
+    
+    public static class EndBag extends BagDelimiterTuple{}
     
+    public static final Tuple startBag = new StartBag();
+    public static final Tuple endBag = new EndBag();
+
+    protected static final int MAX_SPILL_FILES = 100;
+ 
 }

Modified: incubator/pig/trunk/src/org/apache/pig/data/DataMap.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DataMap.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DataMap.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/DataMap.java Fri Jan  4 14:58:20 2008
@@ -24,6 +24,9 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Iterator;
+import java.lang.String;
+
 
 public class DataMap extends Datum {
 
@@ -141,5 +144,19 @@
 	public Map<String, Datum> content(){
 		return content;
 	}
+
+    @Override
+    public long getMemorySize() {
+        long used = 0;
+        Iterator<Map.Entry<String, Datum> > i = content.entrySet().iterator();
+        while (i.hasNext()) {
+            Map.Entry<String, Datum> e = i.next();
+            used += e.getKey().length() * 2 + OBJECT_SIZE + REF_SIZE;
+            used += e.getValue().getMemorySize() + REF_SIZE;
+        }
+
+        used += 2 * OBJECT_SIZE + REF_SIZE;
+        return used;
+    }
 
 }

Modified: incubator/pig/trunk/src/org/apache/pig/data/Datum.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/Datum.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/Datum.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/Datum.java Fri Jan  4 14:58:20 2008
@@ -32,10 +32,15 @@
     public static final byte RECORD_2 = 0x31;
     public static final byte RECORD_3 = 0x41;
 
+	public static final int OBJECT_SIZE = 8;
+	public static final int REF_SIZE = 4;
+
 	@Override
 	public abstract boolean equals(Object o);
 	
 	public abstract void write(DataOutput out) throws IOException;
+
+	public abstract long getMemorySize();
 	
 	     
 }

Added: incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java?rev=609048&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java Fri Jan  4 14:58:20 2008
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.io.DataOutputStream;
+import java.io.DataInputStream;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+
+import org.apache.pig.impl.util.PigLogger;
+
+
+/**
+ * An unordered collection of Tuples (possibly) with multiples.
+ */
+public class DefaultDataBag extends DataBag {
+
+    public DefaultDataBag() {
+        mContents = new ArrayList<Tuple>();
+    }
+
+    @Override
+    public boolean isSorted() {
+        return false;
+    }
+    
+    @Override
+    public boolean isDistinct() {
+        return false;
+    }
+    
+    @Override
+    public Iterator<Tuple> iterator() {
+        return new DefaultDataBagIterator();
+    }
+
+    public long spill() {
+        // Make sure we have something to spill.  Don't create empty
+        // files, as that will make a mess.
+        if (mContents.size() == 0) return 0;
+
+        // Lock the container before I spill, so that iterators aren't
+        // trying to read while I'm mucking with the container.
+        long spilled = 0;
+        synchronized (mContents) {
+            try {
+                DataOutputStream out = getSpillFile();
+                Iterator<Tuple> i = mContents.iterator();
+                while (i.hasNext()) {
+                    i.next().write(out);
+                    spilled++;
+                    // This will spill every 16383 records.
+                    if ((spilled & 0x3fff) == 0) reportProgress();
+                }
+                out.flush();
+            } catch (IOException ioe) {
+                // Remove the last file from the spilled array, since we failed to
+                // write to it.
+                mSpillFiles.remove(mSpillFiles.size() - 1);
+                PigLogger.getLogger().error(
+                    "Unable to spill contents to disk", ioe);
+                return 0;
+            }
+            mContents.clear();
+        }
+        return spilled;
+    }
+
+    /**
+     * An iterator that handles getting the next tuple from the bag.  This
+     * iterator has a couple of issues to deal with.  First, data can be
+     * stored in a combination of in memory and on disk.  Second, the bag
+     * may be asked to spill while the iterator is reading it.  This means
+     * that it will be pointing to someplace in memory and suddenly it
+     * will need to switch to a disk file.
+     */
+    private class DefaultDataBagIterator implements Iterator<Tuple> {
+        // We have to buffer a tuple because there's no easy way for next
+        // to tell whether or not there's another tuple available, other
+        // than to read it.
+        private Tuple mBuf = null;
+        private int mMemoryPtr = 0;
+        private int mFilePtr = 0;
+        private DataInputStream mIn = null;
+        private int mCntr = 0;
+
+        DefaultDataBagIterator() {
+        }
+
+        public boolean hasNext() { 
+            // See if we can find a tuple.  If so, buffer it.
+            mBuf = next();
+            return mBuf != null;
+        }
+
+        public Tuple next() {
+            // This will report progress every 1024 times through next.
+            // This should be much faster than using mod.
+            if ((mCntr++ & 0x3ff) == 0) reportProgress();
+
+            // If there's one in the buffer, use that one.
+            if (mBuf != null) {
+                Tuple t = mBuf;
+                mBuf = null;
+                return t;
+            }
+
+            // See if we've been reading from memory or not.
+            if (mMemoryPtr > 0) {
+                // If there's still data in memory, keep reading from
+                // there.
+                // Lock before we check the size, obtain a reader lock,
+                // from this point forward we can't have them spilling on
+                // us.
+                synchronized (mContents) {
+                    if (mContents.size() > 0) {
+                        return readFromMemory();
+                    }
+                }
+
+                // The container spilled since our last read.  Don't
+                // need to the hold the lock now, as it's already
+                // spilled on us.
+
+                // Our file pointer will already point to the new
+                // spill file (because it was either already 0 or had
+                // been incremented past the end of the old
+                // mSpillFiles.size()).  We need to open the new file
+                // and then fast forward past all of the tuples we've
+                // already read.  Then we need to reset mMemoryPtr so
+                // we know to read from the file next time we come
+                // through.
+                try {
+                    mIn = new DataInputStream(new BufferedInputStream(
+                        new FileInputStream(mSpillFiles.get(mFilePtr++))));
+                } catch (FileNotFoundException fnfe) {
+                    // We can't find our own spill file?  That should never
+                    // happen.
+                    PigLogger.getLogger().fatal(
+                        "Unable to find our spill file", fnfe);
+                    throw new RuntimeException(fnfe);
+                }
+                Tuple t = new Tuple();
+                for (int i = 0; i < mMemoryPtr; i++) {
+                    try {
+                        t.readFields(mIn);
+                    } catch (EOFException eof) {
+                        // This should never happen, it means we
+                        // didn't dump all of our tuples to disk.
+                        PigLogger.getLogger().fatal(
+                            "Ran out of tuples too soon.", eof);
+                        throw new RuntimeException("Ran out of tuples to read prematurely.");
+                    } catch (IOException ioe) {
+                        PigLogger.getLogger().fatal(
+                            "Unable to read our spill file", ioe);
+                        throw new RuntimeException(ioe);
+                    }
+                }
+                mMemoryPtr = 0;
+                return readFromFile();
+            }
+
+            // We haven't read from memory yet, so keep trying to read
+            // from the file
+            return readFromFile();
+        }
+
+        /**
+         * Not implemented.
+         */
+        public void remove() {}
+
+        private Tuple readFromFile() {
+            if (mIn != null) {
+                // We already have a file open
+                Tuple t = new Tuple();
+                try {
+                    t.readFields(mIn);
+                    return t;
+                } catch (EOFException eof) {
+                    // Fall through to the next case where we find the
+                    // next file, or go to memory
+                } catch (IOException ioe) {
+                    PigLogger.getLogger().fatal(
+                        "Unable to read our spill file", ioe);
+                    throw new RuntimeException(ioe);
+                }
+            }
+
+            // Need to open the next file, if there is one.  Have to lock
+            // here, because otherwise we could decide there's no more
+            // files and between the time we decide that and start trying
+            // to read from memory the container could spill, and then
+            // we're stuck.  If there's another file to read, we can
+            // unlock immediately.  If there isn't, we need to hold the
+            // lock and go into readFromMemory().
+            synchronized (mContents) {
+                if (mSpillFiles == null || mFilePtr >= mSpillFiles.size()) {
+                    // We've read everything there is to read from the files, go
+                    // look in memory.
+                    return readFromMemory();
+                }
+            }
+
+            // Open the next file, then call ourselves again as it
+            // will enter the if above.
+            try {
+                mIn = new DataInputStream(new BufferedInputStream(
+                    new FileInputStream(mSpillFiles.get(mFilePtr++))));
+            } catch (FileNotFoundException fnfe) {
+                // We can't find our own spill file?  That should never
+                // happen.
+                PigLogger.getLogger().fatal("Unable to find our spill file",
+                    fnfe);
+                throw new RuntimeException(fnfe);
+            }
+            return readFromFile();
+        }
+
+        // This should only be called once we know we haven't spilled.  It
+        // assumes that the mContents lock is already held before we enter
+        // this function.
+        private Tuple readFromMemory() {
+            if (mContents.size() == 0) return null;
+
+            if (mMemoryPtr < mContents.size()) {
+                return ((ArrayList<Tuple>)mContents).get(mMemoryPtr++);
+            } else {
+                return null;
+            }
+        }
+    }
+}
+

Added: incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java?rev=609048&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java Fri Jan  4 14:58:20 2008
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.ListIterator;
+import java.util.TreeSet;
+import java.util.Arrays;
+import java.io.BufferedInputStream;
+import java.io.DataOutputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.util.PigLogger;
+
+
+
+/**
+ * An unordered collection of Tuples with no multiples.  Data is
+ * stored without duplicates as it comes in.  When it is time to spill,
+ * that data is sorted and written to disk.  It must also be sorted upon
+ * the first read, otherwise if a spill happened after that the iterators
+ * would have no way to find their place in the new file.  The data is
+ * stored in a HashSet.  When it is time to sort it is placed in an
+ * ArrayList and then sorted.  Dispite all these machinations, this was
+ * found to be faster than storing it in a TreeSet.
+ */
+public class DistinctDataBag extends DataBag {
+    public DistinctDataBag() {
+        mContents = new HashSet<Tuple>();
+    }
+
+    @Override
+    public boolean isSorted() {
+        return false;
+    }
+    
+    @Override
+    public boolean isDistinct() {
+        return true;
+    }
+    
+    @Override
+    public Iterator<Tuple> iterator() {
+        return new DistinctDataBagIterator();
+    }
+
+    @Override
+    public void add(Tuple t) {
+        synchronized (mContents) {
+            if (mContents.add(t)) {
+                mSize++;
+            }
+        }
+    }
+
+    @Override
+    public void addAll(DataBag b) {
+        synchronized (mContents) {
+            mSize += b.size();
+            Iterator<Tuple> i = b.iterator();
+            while (i.hasNext()) {
+                if (mContents.add(i.next())) {
+                    mSize++;
+                }
+            }
+        }
+    }
+
+
+    public long spill() {
+        // Make sure we have something to spill.  Don't create empty
+        // files, as that will make a mess.
+        if (mContents.size() == 0) return 0;
+
+        // Lock the container before I spill, so that iterators aren't
+        // trying to read while I'm mucking with the container.
+        long spilled = 0;
+        synchronized (mContents) {
+            try {
+                DataOutputStream out = getSpillFile();
+                // If we've already started reading, then it will already be
+                // sorted into an array list.  If not, we need to sort it
+                // before writing.
+                if (mContents instanceof ArrayList) {
+                    Iterator<Tuple> i = mContents.iterator();
+                    while (i.hasNext()) {
+                        i.next().write(out);
+                        spilled++;
+                        // This will spill every 16383 records.
+                        if ((spilled & 0x3fff) == 0) reportProgress();
+                    }
+                } else {
+                    Tuple[] array = new Tuple[mContents.size()];
+                    mContents.toArray(array);
+                    Arrays.sort(array);
+                    for (int i = 0; i < array.length; i++) {
+                        array[i].write(out);
+                        spilled++;
+                        // This will spill every 16383 records.
+                        if ((spilled & 0x3fff) == 0) reportProgress();
+                    }
+                }
+                out.flush();
+            } catch (IOException ioe) {
+                // Remove the last file from the spilled array, since we failed to
+                // write to it.
+                mSpillFiles.remove(mSpillFiles.size() - 1);
+                PigLogger.getLogger().error(
+                    "Unable to spill contents to disk", ioe);
+                return 0;
+            }
+            mContents.clear();
+        }
+        return spilled;
+    }
+
+    /**
+     * An iterator that handles getting the next tuple from the bag.  This
+     * iterator has a couple of issues to deal with.  First, data can be
+     * stored in a combination of in memory and on disk.  Second, the bag
+     * may be asked to spill while the iterator is reading it.  This means
+     * that it will be pointing to someplace in memory and suddenly it
+     * will need to switch to a disk file.
+     */
+    private class DistinctDataBagIterator implements Iterator<Tuple> {
+
+        private class TContainer implements Comparable<TContainer> {
+            public Tuple tuple;
+            public int fileNum;
+
+            public int compareTo(TContainer other) {
+                return tuple.compareTo(other.tuple);
+            }
+        }
+
+        // We have to buffer a tuple because there's no easy way for next
+        // to tell whether or not there's another tuple available, other
+        // than to read it.
+        private Tuple mBuf = null;
+        private int mMemoryPtr = 0;
+        private TreeSet<TContainer> mMergeTree = null;
+        private ArrayList<DataInputStream> mStreams = null;
+        private int mCntr = 0;
+
+        DistinctDataBagIterator() {
+            // If this is the first read, we need to sort the data.
+            synchronized (mContents) {
+                if (mContents instanceof HashSet) {
+                    preMerge();
+                    // We're the first reader, we need to sort the data.
+                    // This is in case it gets dumped under us.
+                    ArrayList<Tuple> l = new ArrayList<Tuple>(mContents);
+                    Collections.sort(l);
+                    mContents = l;
+                }
+            }
+        }
+
+        public boolean hasNext() { 
+            // See if we can find a tuple.  If so, buffer it.
+            mBuf = next();
+            return mBuf != null;
+        }
+
+        public Tuple next() {
+            // This will report progress every 1024 times through next.
+            // This should be much faster than using mod.
+            if ((mCntr++ & 0x3ff) == 0) reportProgress();
+
+            // If there's one in the buffer, use that one.
+            if (mBuf != null) {
+                Tuple t = mBuf;
+                mBuf = null;
+                return t;
+            }
+
+            // Check to see if we just need to read from memory.
+            boolean spilled = false;
+            synchronized (mContents) {
+                if (mSpillFiles == null || mSpillFiles.size() == 0) {
+                    return readFromMemory();
+                }
+
+                if (mMemoryPtr > 0 && mContents.size() == 0) {
+                    spilled = true;
+                }
+            }
+
+            // Check to see if we were reading from memory but we spilled
+            if (spilled) {
+                DataInputStream in;
+                // We need to open the new file
+                // and then fast forward past all of the tuples we've
+                // already read.  Then we need to place the first tuple
+                // from that file in the priority queue.  Whatever tuples
+                // from memory that were already in the queue will be fine,
+                // as they're guaranteed to be ahead of the point we fast
+                // foward to.
+                try {
+                    in = new DataInputStream(new BufferedInputStream(
+                        new FileInputStream(mSpillFiles.get(
+                                mSpillFiles.size() - 1))));
+                    if (mStreams == null) {
+                        mMergeTree = new TreeSet<TContainer>();
+                        // We didn't have any files before this spill.
+                        mStreams = new ArrayList<DataInputStream>(1);
+                    }
+                    mStreams.add(in);
+                } catch (FileNotFoundException fnfe) {
+                    // We can't find our own spill file?  That should never
+                    // happen.
+                    PigLogger.getLogger().fatal(
+                        "Unable to find our spill file", fnfe);
+                    throw new RuntimeException(fnfe);
+                }
+
+                // Fast foward past the tuples we've already put in the
+                // queue.
+                Tuple t = new Tuple();
+                for (int i = 0; i < mMemoryPtr; i++) {
+                    try {
+                        t.readFields(in);
+                    } catch (EOFException eof) {
+                        // This should never happen, it means we
+                        // didn't dump all of our tuples to disk.
+                        throw new RuntimeException("Ran out of tuples to read prematurely.");
+                    } catch (IOException ioe) {
+                        PigLogger.getLogger().fatal(
+                            "Unable to read our spill file", ioe);
+                        throw new RuntimeException(ioe);
+                    }
+                }
+                mMemoryPtr = 0;
+                // Add the next tuple from this file to the queue.
+                addToQueue(null, mSpillFiles.size() - 1);
+                // Fall through to read the next entry from the priority
+                // queue.
+            }
+
+            // We have spill files, so we need to read the next tuple from
+            // one of those files or from memory.
+            return readFromTree();
+        }
+
+        /**
+         * Not implemented.
+         */
+        public void remove() {}
+
+        private Tuple readFromTree() {
+            if (mMergeTree == null) {
+                // First read, we need to set up the queue and the array of
+                // file streams
+                mMergeTree = new TreeSet<TContainer>();
+
+                // Add one to the size in case we spill later.
+                mStreams =
+                    new ArrayList<DataInputStream>(mSpillFiles.size() + 1);
+
+                Iterator<File> i = mSpillFiles.iterator();
+                while (i.hasNext()) {
+                    try {
+                        DataInputStream in = 
+                            new DataInputStream(new BufferedInputStream(
+                                new FileInputStream(i.next())));
+                        mStreams.add(in);
+                        // Add the first tuple from this file into the
+                        // merge queue.
+                        addToQueue(null, mStreams.size() - 1);
+                    } catch (FileNotFoundException fnfe) {
+                        // We can't find our own spill file?  That should
+                        // never happen.
+                        PigLogger.getLogger().fatal(
+                            "Unable to find out spill file.", fnfe);
+                        throw new RuntimeException(fnfe);
+                    }
+                }
+
+                // Prime one from memory too
+                if (mContents.size() > 0) {
+                    addToQueue(null, -1);
+                }
+            }
+
+            if (mMergeTree.size() == 0) return null;
+
+            // Pop the top one off the queue
+            TContainer c = mMergeTree.first();
+            mMergeTree.remove(c);
+
+            // Add the next tuple from whereever we read from into the
+            // queue.  Buffer the tuple we're returning, as we'll be
+            // reusing c.
+            Tuple t = c.tuple;
+            addToQueue(c, c.fileNum);
+
+            return t;
+        }
+
+        private void addToQueue(TContainer c, int fileNum) {
+            if (c == null) {
+                c = new TContainer();
+            }
+            c.fileNum = fileNum;
+
+            if (fileNum == -1) {
+                // Need to read from memory.  We may have spilled since
+                // this tuple was put in the queue, and hence memory might
+                // be empty.  But I don't care, as then I just won't add
+                // any more from memory.
+                synchronized (mContents) {
+                    do {
+                        c.tuple = readFromMemory();
+                        if (c.tuple != null) {
+                            // If we find a unique entry, then add it to the queue.
+                            // Otherwise ignore it and keep reading.
+                            if (mMergeTree.add(c)) {
+                                return;
+                            }
+                        }
+                    } while (c.tuple != null);
+                }
+                return;
+            }
+
+            // Read the next tuple from the indicated file
+            DataInputStream in = mStreams.get(fileNum);
+            if (in != null) {
+                // There's still data in this file
+                c.tuple = new Tuple();
+                do {
+                    try {
+                        c.tuple.readFields(in);
+                        // If we find a unique entry, then add it to the queue.
+                        // Otherwise ignore it and keep reading.  If we run out
+                        // of tuples to read that's fine, we just won't add a
+                        // new one from this file.
+                        if (mMergeTree.add(c)) {
+                            return;
+                        }
+                    } catch (EOFException eof) {
+                        // Out of tuples in this file.  Set our slot in the
+                        // array to null so we don't keep trying to read from
+                        // this file.
+                        mStreams.set(fileNum, null);
+                        return;
+                    } catch (IOException ioe) {
+                        PigLogger.getLogger().fatal(
+                            "Unable to read our spill file", ioe);
+                        throw new RuntimeException(ioe);
+                    }
+                } while (true);
+            }
+        }
+
+        // Function assumes that the reader lock is already held before we enter
+        // this function.
+        private Tuple readFromMemory() {
+            if (mContents.size() == 0) return null;
+
+            if (mMemoryPtr < mContents.size()) {
+                return ((ArrayList<Tuple>)mContents).get(mMemoryPtr++);
+            } else {
+                return null;
+            }
+        }
+
+        /**
+         * Pre-merge if there are too many spill files.  This avoids the issue
+         * of having too large a fan out in our merge.  Experimentation by
+         * the hadoop team has shown that 100 is about the optimal number
+         * of spill files.  This function modifies the mSpillFiles array
+         * and assumes the write lock is already held. It will not unlock it.
+         *
+         * Tuples are reconstituted as tuples, evaluated, and rewritten as
+         * tuples.  This is expensive, but I don't know how to read tuples
+         * from the file otherwise.
+         *
+         * This function is slightly different than the one in
+         * SortedDataBag, as it uses a TreeSet instead of a PriorityQ.
+         */
+        private void preMerge() {
+            if (mSpillFiles == null ||
+                    mSpillFiles.size() <= MAX_SPILL_FILES) {
+                return;
+            }
+
+            // While there are more than max spill files, gather max spill
+            // files together and merge them into one file.  Then remove the others
+            // from mSpillFiles.  The new spill files are attached at the
+            // end of the list, so I can just keep going until I get a
+            // small enough number without too much concern over uneven
+            // size merges.  Convert mSpillFiles to a linked list since
+            // we'll be removing pieces from the middle and we want to do
+            // it efficiently.
+            try {
+                LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
+                while (ll.size() > MAX_SPILL_FILES) {
+                    ListIterator<File> i = ll.listIterator();
+                    mStreams =
+                        new ArrayList<DataInputStream>(MAX_SPILL_FILES);
+                    mMergeTree = new TreeSet<TContainer>();
+
+                    for (int j = 0; j < MAX_SPILL_FILES; j++) {
+                        try {
+                            DataInputStream in =
+                                new DataInputStream(new BufferedInputStream(
+                                    new FileInputStream(i.next())));
+                            mStreams.add(in);
+                            addToQueue(null, mStreams.size() - 1);
+                            i.remove();
+                        } catch (FileNotFoundException fnfe) {
+                            // We can't find our own spill file?  That should
+                            // neer happen.
+                            PigLogger.getLogger().fatal(
+                                "Unable to find out spill file.", fnfe);
+                            throw new RuntimeException(fnfe);
+                        }
+                    }
+
+                    // Get a new spill file.  This adds one to the end of
+                    // the spill files list.  So I need to append it to my
+                    // linked list as well so that it's still there when I
+                    // move my linked list back to the spill files.
+                    try {
+                        DataOutputStream out = getSpillFile();
+                        ll.add(mSpillFiles.get(mSpillFiles.size() - 1));
+                        Tuple t;
+                        while ((t = readFromTree()) != null) {
+                            t.write(out);
+                        }
+                        out.flush();
+                    } catch (IOException ioe) {
+                        PigLogger.getLogger().fatal(
+                            "Unable to read our spill file", ioe);
+                        throw new RuntimeException(ioe);
+                    }
+                }
+
+                // Now, move our new list back to the spill files array.
+                mSpillFiles = new ArrayList<File>(ll);
+            } finally {
+                // Reset mStreams and mMerge so that they'll be allocated
+                // properly for regular merging.
+                mStreams = null;
+                mMergeTree = null;
+            }
+        }
+    }
+    
+}
+

Added: incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java?rev=609048&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java Fri Jan  4 14:58:20 2008
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.ListIterator;
+import java.util.PriorityQueue;
+import java.util.Iterator;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.util.PigLogger;
+
+
+
+/**
+ * An ordered collection of Tuples (possibly) with multiples.  Data is
+ * stored unsorted as it comes in, and only sorted when it is time to dump
+ * it to a file or when the first iterator is requested.  Experementation
+ * found this to be the faster than storing it sorted to begin with.
+ */
+public class SortedDataBag extends DataBag {
+    private Comparator<Tuple> mComp;
+    private boolean mReadStarted = false;
+
+    private class DefaultComparator implements Comparator<Tuple> {
+        public int compare(Tuple t1, Tuple t2) {
+            return t1.compareTo(t2);
+        }
+
+        public boolean equals(Object o) {
+            return false;
+        }
+
+    }
+
+    public SortedDataBag(EvalSpec spec) {
+        if (spec == null) {
+            mComp = new DefaultComparator();
+        } else {
+            mComp = spec.getComparator();
+        }
+
+        mContents = new ArrayList<Tuple>();
+    }
+
+    @Override
+    public boolean isSorted() {
+        return true;
+    }
+    
+    @Override
+    public boolean isDistinct() {
+        return false;
+    }
+    
+    @Override
+    public Iterator<Tuple> iterator() {
+        return new SortedDataBagIterator();
+    }
+
+    public long spill() {
+        // Make sure we have something to spill.  Don't create empty
+        // files, as that will make a mess.
+        if (mContents.size() == 0) return 0;
+
+        // Lock the container before I spill, so that iterators aren't
+        // trying to read while I'm mucking with the container.
+        long spilled = 0;
+        synchronized (mContents) {
+            try {
+                DataOutputStream out = getSpillFile();
+                // Have to sort the data before we can dump it.  It's bogus
+                // that we have to do this under the lock, but there's no way
+                // around it.
+                Collections.sort((ArrayList<Tuple>)mContents, mComp);
+                Iterator<Tuple> i = mContents.iterator();
+                while (i.hasNext()) {
+                    i.next().write(out);
+                    spilled++;
+                    // This will spill every 16383 records.
+                    if ((spilled & 0x3fff) == 0) reportProgress();
+                }
+                out.flush();
+            } catch (IOException ioe) {
+                // Remove the last file from the spilled array, since we failed to
+                // write to it.
+                mSpillFiles.remove(mSpillFiles.size() - 1);
+                PigLogger.getLogger().error(
+                    "Unable to spill contents to disk", ioe);
+                return 0;
+            }
+            mContents.clear();
+        }
+        return spilled;
+    }
+
+    /**
+     * An iterator that handles getting the next tuple from the bag.  This
+     * iterator has a couple of issues to deal with.  First, data can be
+     * stored in a combination of in memory and on disk.  Second, the bag
+     * may be asked to spill while the iterator is reading it.  This means
+     * that it will be pointing to someplace in memory and suddenly it
+     * will need to switch to a disk file.
+     */
+    private class SortedDataBagIterator implements Iterator<Tuple> {
+
+        private class PQContainer implements Comparable<PQContainer> {
+            public Tuple tuple;
+            public int fileNum;
+
+            public int compareTo(PQContainer other) {
+                return mComp.compare(tuple, other.tuple);
+            }
+        }
+
+        // We have to buffer a tuple because there's no easy way for next
+        // to tell whether or not there's another tuple available, other
+        // than to read it.
+        private Tuple mBuf = null;
+        private int mMemoryPtr = 0;
+        private PriorityQueue<PQContainer> mMergeQ = null;
+        private ArrayList<DataInputStream> mStreams = null;
+        private int mCntr = 0;
+
+        SortedDataBagIterator() {
+            // If this is the first read, we need to sort the data.
+            synchronized (mContents) {
+                if (!mReadStarted) {
+                    preMerge();
+                    Collections.sort((ArrayList<Tuple>)mContents, mComp);
+                    mReadStarted = true;
+                }
+            }
+        }
+
+        public boolean hasNext() { 
+            // See if we can find a tuple.  If so, buffer it.
+            mBuf = next();
+            return mBuf != null;
+        }
+
+        public Tuple next() {
+            // This will report progress every 1024 times through next.
+            // This should be much faster than using mod.
+            if ((mCntr++ & 0x3ff) == 0) reportProgress();
+
+            // If there's one in the buffer, use that one.
+            if (mBuf != null) {
+                Tuple t = mBuf;
+                mBuf = null;
+                return t;
+            }
+
+            // Check to see if we just need to read from memory.
+            boolean spilled = false;
+            synchronized (mContents) {
+                if (mSpillFiles == null || mSpillFiles.size() == 0) {
+                    return readFromMemory();
+                }
+
+                // Check to see if we were reading from memory but we spilled
+                if (mMemoryPtr > 0 && mContents.size() == 0) {
+                    spilled = true;
+                }
+            }
+
+            if (spilled) {
+                DataInputStream in;
+                // We need to open the new file
+                // and then fast forward past all of the tuples we've
+                // already read.  Then we need to place the first tuple
+                // from that file in the priority queue.  Whatever tuples
+                // from memory that were already in the queue will be fine,
+                // as they're guaranteed to be ahead of the point we fast
+                // foward to.
+                try {
+                    in = new DataInputStream(new BufferedInputStream(
+                        new FileInputStream(mSpillFiles.get(
+                                mSpillFiles.size() - 1))));
+                    if (mStreams == null) {
+                        // We didn't have any files before this spill.
+                        mMergeQ = new PriorityQueue<PQContainer>(1);
+                        mStreams = new ArrayList<DataInputStream>(1);
+                    }
+                    mStreams.add(in);
+                } catch (FileNotFoundException fnfe) {
+                    // We can't find our own spill file?  That should never
+                    // happen.
+                    PigLogger.getLogger().fatal(
+                        "Unable to find our spill file", fnfe);
+                    throw new RuntimeException(fnfe);
+                }
+
+                // Fast foward past the tuples we've already put in the
+                // queue.
+                Tuple t = new Tuple();
+                for (int i = 0; i < mMemoryPtr; i++) {
+                    try {
+                        t.readFields(in);
+                    } catch (EOFException eof) {
+                        // This should never happen, it means we
+                        // didn't dump all of our tuples to disk.
+                        PigLogger.getLogger().fatal(
+                            "Ran out of tuples too soon.", eof);
+                        throw new RuntimeException("Ran out of tuples to read prematurely.");
+                    } catch (IOException ioe) {
+                        PigLogger.getLogger().fatal(
+                            "Unable to read our spill file", ioe);
+                        throw new RuntimeException(ioe);
+                    }
+                }
+                mMemoryPtr = 0;
+                // Add the next tuple from this file to the queue.
+                addToQueue(null, mSpillFiles.size() - 1);
+                // Fall through to read the next entry from the priority
+                // queue.
+            }
+
+            // We have spill files, so we need to read the next tuple from
+            // one of those files or from memory.
+            return readFromPriorityQ();
+        }
+
+        /**
+         * Not implemented.
+         */
+        public void remove() {}
+
+        private Tuple readFromPriorityQ() {
+            if (mMergeQ == null) {
+                // First read, we need to set up the queue and the array of
+                // file streams
+                // Add one to the size for the list in memory.
+                mMergeQ =
+                    new PriorityQueue<PQContainer>(mSpillFiles.size() + 1);
+
+                // Add one to the size in case we spill later.
+                mStreams =
+                    new ArrayList<DataInputStream>(mSpillFiles.size() + 1);
+
+                Iterator<File> i = mSpillFiles.iterator();
+                while (i.hasNext()) {
+                    try {
+                        DataInputStream in = 
+                            new DataInputStream(new BufferedInputStream(
+                                new FileInputStream(i.next())));
+                        mStreams.add(in);
+                        // Add the first tuple from this file into the
+                        // merge queue.
+                        addToQueue(null, mStreams.size() - 1);
+                    } catch (FileNotFoundException fnfe) {
+                        // We can't find our own spill file?  That should
+                        // never happen.
+                        PigLogger.getLogger().fatal(
+                            "Unable to find our spill file", fnfe);
+                        throw new RuntimeException(fnfe);
+                    }
+                }
+
+                // Prime one from memory too
+                if (mContents.size() > 0) {
+                    addToQueue(null, -1);
+                }
+            }
+
+            // Pop the top one off the queue
+            PQContainer c = mMergeQ.poll();
+            if (c == null) return null;
+
+            // Add the next tuple from whereever we read from into the
+            // queue.  Buffer the tuple we're returning, as we'll be
+            // reusing c.
+            Tuple t = c.tuple;
+            addToQueue(c, c.fileNum);
+
+            return t;
+        }
+
+        private void addToQueue(PQContainer c, int fileNum) {
+            if (c == null) {
+                c = new PQContainer();
+            }
+            c.fileNum = fileNum;
+
+            if (fileNum == -1) {
+                // Need to read from memory.  We may have spilled since
+                // this tuple was put in the queue, and hence memory might
+                // be empty.  But I don't care, as then I just won't add
+                // any more from memory.
+                synchronized (mContents) {
+                    c.tuple = readFromMemory();
+                }
+                if (c.tuple != null) {
+                    mMergeQ.add(c);
+                }
+                return;
+            }
+
+            // Read the next tuple from the indicated file
+            DataInputStream in = mStreams.get(fileNum);
+            if (in != null) {
+                // There's still data in this file
+                c.tuple = new Tuple();
+                try {
+                    c.tuple.readFields(in);
+                    mMergeQ.add(c);
+                } catch (EOFException eof) {
+                    // Out of tuples in this file.  Set our slot in the
+                    // array to null so we don't keep trying to read from
+                    // this file.
+                    mStreams.set(fileNum, null);
+                } catch (IOException ioe) {
+                    PigLogger.getLogger().fatal(
+                        "Unable to read our spill file", ioe);
+                    throw new RuntimeException(ioe);
+                }
+
+            }
+        }
+
+        // Function assumes that the reader lock is already held before we enter
+        // this function.
+        private Tuple readFromMemory() {
+            if (mContents.size() == 0) return null;
+
+            if (mMemoryPtr < mContents.size()) {
+                return ((ArrayList<Tuple>)mContents).get(mMemoryPtr++);
+            } else {
+                return null;
+            }
+        }
+
+        /**
+         * Pre-merge if there are too many spill files.  This avoids the issue
+         * of having too large a fan out in our merge.  Experimentation by
+         * the hadoop team has shown that 100 is about the optimal number
+         * of spill files.  This function modifies the mSpillFiles array
+         * and assumes the write lock is already held. It will not unlock it.
+         *
+         * Tuples are reconstituted as tuples, evaluated, and rewritten as
+         * tuples.  This is expensive, but I need to do this in order to
+         * use the sort spec that was provided to me.
+         */
+        private void preMerge() {
+            if (mSpillFiles == null ||
+                    mSpillFiles.size() <= MAX_SPILL_FILES) {
+                return;
+            }
+
+            // While there are more than max spill files, gather max spill
+            // files together and merge them into one file.  Then remove the others
+            // from mSpillFiles.  The new spill files are attached at the
+            // end of the list, so I can just keep going until I get a
+            // small enough number without too much concern over uneven
+            // size merges.  Convert mSpillFiles to a linked list since
+            // we'll be removing pieces from the middle and we want to do
+            // it efficiently.
+            try {
+                LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
+                while (ll.size() > MAX_SPILL_FILES) {
+                    ListIterator<File> i = ll.listIterator();
+                    mStreams =
+                        new ArrayList<DataInputStream>(MAX_SPILL_FILES);
+                    mMergeQ = new PriorityQueue<PQContainer>(MAX_SPILL_FILES);
+
+                    for (int j = 0; j < MAX_SPILL_FILES; j++) {
+                        try {
+                            DataInputStream in =
+                                new DataInputStream(new BufferedInputStream(
+                                    new FileInputStream(i.next())));
+                            mStreams.add(in);
+                            addToQueue(null, mStreams.size() - 1);
+                            i.remove();
+                        } catch (FileNotFoundException fnfe) {
+                            // We can't find our own spill file?  That should
+                            // neer happen.
+                            PigLogger.getLogger().fatal(
+                                "Unable to find our spill file", fnfe);
+                            throw new RuntimeException(fnfe);
+                        }
+                    }
+
+                    // Get a new spill file.  This adds one to the end of
+                    // the spill files list.  So I need to append it to my
+                    // linked list as well so that it's still there when I
+                    // move my linked list back to the spill files.
+                    try {
+                        DataOutputStream out = getSpillFile();
+                        ll.add(mSpillFiles.get(mSpillFiles.size() - 1));
+                        Tuple t;
+                        while ((t = readFromPriorityQ()) != null) {
+                            t.write(out);
+                        }
+                        out.flush();
+                    } catch (IOException ioe) {
+                        PigLogger.getLogger().fatal(
+                            "Unable to read our spill file", ioe);
+                        throw new RuntimeException(ioe);
+                    }
+                }
+
+                // Now, move our new list back to the spill files array.
+                mSpillFiles = new ArrayList<File>(ll);
+            } finally {
+                // Reset mStreams and mMerge so that they'll be allocated
+                // properly for regular merging.
+                mStreams = null;
+                mMergeQ = null;
+            }
+        }
+    }
+}
+

Modified: incubator/pig/trunk/src/org/apache/pig/data/Tuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/Tuple.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/Tuple.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/Tuple.java Fri Jan  4 14:58:20 2008
@@ -161,8 +161,8 @@
             }
         } else if (field instanceof DataBag) {
             DataBag b = (DataBag) field;
-            if (b.cardinality() == 1) {
-                Tuple t = b.content().next();
+            if (b.size() == 1) {
+                Tuple t = b.iterator().next();
                 if (t.arity() == 1) {
                     return t.getAtomField(0);
                 }
@@ -180,8 +180,8 @@
             return (Tuple) field;
         } else if (field instanceof DataBag) {
             DataBag b = (DataBag) field;
-            if (b.cardinality() == 1) {
-                return b.content().next();
+            if (b.size() == 1) {
+                return b.iterator().next();
             }
         }
 
@@ -356,5 +356,18 @@
                 break;
         }
         return i;
+    }
+
+	@Override
+    public long getMemorySize() {
+        long used = 0;
+        try {
+            int sz = fields.size();
+            for (int i = 0; i < sz; i++) used += getField(i).getMemorySize();
+            used += 2 * OBJECT_SIZE + REF_SIZE;
+        } catch (IOException ioe) {
+            // Not really much I can do here.
+        }
+        return used;
     }
 }