You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/01/22 22:17:22 UTC

svn commit: r614325 [5/6] - in /incubator/pig/branches/types: ./ lib/ scripts/ src/org/apache/pig/ src/org/apache/pig/builtin/ src/org/apache/pig/data/ src/org/apache/pig/impl/ src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/eval/ src/org/apac...

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POCogroup.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POCogroup.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POCogroup.java Tue Jan 22 13:17:12 2008
@@ -26,8 +26,9 @@
 import org.apache.pig.data.AmendableTuple;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Datum;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.eval.EvalSpec;
 import org.apache.pig.impl.eval.collector.DataCollector;
 import org.apache.pig.impl.logicalLayer.LOCogroup;
@@ -40,7 +41,7 @@
 	 * 
 	 */
 	private static final long serialVersionUID = 1L;
-	List<Datum[]>[] sortedInputs;
+	List<Object[]>[] sortedInputs;
     List<EvalSpec>       specs;
         
     public POCogroup(List<EvalSpec> specs, int outputType) {
@@ -61,11 +62,11 @@
         for (int i = 0; i < inputs.length; i++) {
         	
         	final int finalI = i;
-            sortedInputs[i] = new ArrayList<Datum[]>();
+            sortedInputs[i] = new ArrayList<Object[]>();
             
             DataCollector outputFromSpec = new DataCollector(null){
             	@Override
-            	public void add(Datum d) {
+            	public void add(Object d) {
             		sortedInputs[finalI].add(LOCogroup.getGroupAndTuple(d));
             	}
             };
@@ -78,9 +79,9 @@
             }
             inputToSpec.finishPipe();
 
-            Collections.sort(sortedInputs[i], new Comparator<Datum[]>() {
-                public int compare(Datum[] a, Datum[] b) {
-                    return a[0].compareTo(b[0]);
+            Collections.sort(sortedInputs[i], new Comparator<Object[]>() {
+                public int compare(Object[] a, Object[] b) {
+                    return DataType.compare(a[0], b[0]);
                 }
             });
         }
@@ -95,11 +96,11 @@
 
             // find the smallest group among all inputs (this is the group we should make a tuple
             // out of)
-            Datum smallestGroup = null;
+            Object smallestGroup = null;
             for (int i = 0; i < inputs.length; i++) {
                 if (sortedInputs[i].size() > 0) {
-                    Datum g = (sortedInputs[i].get(0))[0];
-                    if (smallestGroup == null || g.compareTo(smallestGroup)<0)
+                    Object g = (sortedInputs[i].get(0))[0];
+                    if (smallestGroup == null || DataType.compare(g, smallestGroup)<0)
                         smallestGroup = g;
                 }
             }
@@ -112,26 +113,26 @@
             
             Tuple output;
             if (outputType == LogicalOperator.AMENDABLE) output = new AmendableTuple(1 + inputs.length, smallestGroup);
-            else output = new Tuple(1 + inputs.length);
+            else output = TupleFactory.getInstance().newTuple(1 + inputs.length);
 
             // set first field to the group tuple
-            output.setField(0, smallestGroup);
+            output.set(0, smallestGroup);
             
             if (lineageTracer != null) lineageTracer.insert(output);
 
             boolean done = true;
             for (int i = 0; i < inputs.length; i++) {
-                DataBag b =
-					BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE);
+                DataBag b = BagFactory.getInstance().newDefaultBag();
 
                 while (sortedInputs[i].size() > 0) {
-                    Datum g = sortedInputs[i].get(0)[0];
+                    Object g = sortedInputs[i].get(0)[0];
 
                     Tuple t = (Tuple) sortedInputs[i].get(0)[1];
 
-                    if (g.compareTo(smallestGroup) < 0) {
+                    int c = DataType.compare(g, smallestGroup);
+                    if (c < 0) {
                         sortedInputs[i].remove(0); // discard this tuple
-                    } else if (g.equals(smallestGroup)) {
+                    } else if (c == 0) {
                         b.add(t);
                         if (lineageTracer != null) lineageTracer.union(t, output);   // update lineage
                         sortedInputs[i].remove(0);
@@ -140,17 +141,21 @@
                     }
                 }
 
-                if (specs.get(i).isInner() && b.isEmpty())
+                if (specs.get(i).isInner() && (b.size() == 0))
                     done = false; // this input uses "inner" semantics, and it has no tuples for
                                     // this group, so suppress the tuple we're currently building
 
-                output.setField(1 + i, b);
+                output.set(1 + i, b);
             }
 
             if (done)
                 return output;
         }
 
+    }
+
+    public void visit(POVisitor v) {
+        v.visitCogroup(this);
     }
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POEval.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POEval.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POEval.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POEval.java Tue Jan 22 13:17:12 2008
@@ -97,4 +97,8 @@
         }
     }
        
+    public void visit(POVisitor v) {
+        v.visitEval(this);
+    }
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POLoad.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POLoad.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POLoad.java Tue Jan 22 13:17:12 2008
@@ -65,4 +65,9 @@
         return lf.getNext();
     }
 
+    @Override
+    public void visit(POVisitor v) {
+        v.visitLoad(this);
+    }
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POMapreduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POMapreduce.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POMapreduce.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POMapreduce.java Tue Jan 22 13:17:12 2008
@@ -19,7 +19,12 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Comparator;
 
+import org.apache.log4j.Logger;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.eval.EvalSpec;
@@ -27,25 +32,28 @@
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.mapreduceExec.MapReduceLauncher;
 import org.apache.pig.impl.util.ObjectSerializer;
-
+import org.apache.pig.impl.util.PigLogger;
 
 public class POMapreduce extends PhysicalOperator {
 	private static final long serialVersionUID = 1L;
 	
     public ArrayList<EvalSpec> toMap             = new ArrayList<EvalSpec>();
-    public ArrayList<EvalSpec>     toCombine         = null;
+    //public ArrayList<EvalSpec>     toCombine         = null;
+    public EvalSpec     toCombine         = null;
     public EvalSpec    toReduce          = null;
     public ArrayList<EvalSpec>  groupFuncs           = null;
     public SplitSpec        toSplit           = null;
     public ArrayList<FileSpec>           inputFileSpecs         = new ArrayList<FileSpec>();
     public FileSpec         outputFileSpec        = null;
     public Class           partitionFunction = null;
+    public Class<WritableComparator> userComparator = null;
     public String quantilesFile = null;
     public PigContext pigContext;
     
     public int                     mapParallelism       = -1;     // -1 means let hadoop decide
     public int                     reduceParallelism    = -1;
 
+    
     static MapReduceLauncher mapReduceLauncher = new MapReduceLauncher();
 
     
@@ -156,16 +164,16 @@
     }
 
     void print() {
-        System.out.println("\n----- MapReduce Job -----");
-        System.out.println("Input: " + inputFileSpecs);
-        System.out.println("Map: " + toMap);
-        System.out.println("Group: " + groupFuncs);
-        System.out.println("Combine: " + toCombine);
-        System.out.println("Reduce: " + toReduce);
-        System.out.println("Output: " + outputFileSpec);
-        System.out.println("Split: " + toSplit);
-        System.out.println("Map parallelism: " + mapParallelism);
-        System.out.println("Reduce parallelism: " + reduceParallelism);
+        Logger log = PigLogger.getLogger();
+        log.debug("Input: " + inputFileSpecs);
+        log.debug("Map: " + toMap);
+        log.debug("Group: " + groupFuncs);
+        log.debug("Combine: " + toCombine);
+        log.debug("Reduce: " + toReduce);
+        log.debug("Output: " + outputFileSpec);
+        log.debug("Split: " + toSplit);
+        log.debug("Map parallelism: " + mapParallelism);
+        log.debug("Reduce parallelism: " + reduceParallelism);
     }
     
     public POMapreduce copy(){
@@ -199,6 +207,10 @@
     		toReduce = spec;
     	else
     		toReduce = toReduce.addSpec(spec);
+    }
+
+    public void visit(POVisitor v) {
+        v.visitMapreduce(this);
     }
     
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PORead.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PORead.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PORead.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PORead.java Tue Jan 22 13:17:12 2008
@@ -22,7 +22,6 @@
 
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.Datum;
 
 
 public class PORead extends PhysicalOperator {
@@ -31,7 +30,7 @@
 	 */
 	private static final long serialVersionUID = 1L;
 	DataBag             bag;
-    Iterator<Datum> it;
+    Iterator<Tuple> it;
 
     public PORead(DataBag bagIn, int outputType) {
     	super(outputType);
@@ -47,7 +46,7 @@
     	if (continueFromLast){
     		throw new RuntimeException("LOReads should not occur in continuous plans");
     	}
-        it = bag.content();
+        it = bag.iterator();
 
         return true;
     }
@@ -55,9 +54,13 @@
     @Override
 	public Tuple getNext() throws IOException {
         if (it.hasNext())
-            return (Tuple)it.next();
+            return it.next();
         else
             return null;
+    }
+
+    public void visit(POVisitor v) {
+        v.visitRead(this);
     }
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSort.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSort.java Tue Jan 22 13:17:12 2008
@@ -24,14 +24,13 @@
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.Datum;
 import org.apache.pig.impl.eval.EvalSpec;
 
 
 public class POSort extends PhysicalOperator {
 	static final long serialVersionUID = 1L; 
 	EvalSpec sortSpec;
-	transient Iterator<Datum> iter;
+	transient Iterator<Tuple> iter;
 	
 	
 	public POSort(EvalSpec sortSpec, int outputType) {
@@ -44,24 +43,27 @@
 	public boolean open(boolean continueFromLast) throws IOException {
 		if (!super.open(continueFromLast))
 			return false;
-		DataBag bag =
-			BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE);
+		DataBag bag = BagFactory.getInstance().newSortedBag(sortSpec);
 		
-		bag.sort(sortSpec);
 		Tuple t;
 		while((t = inputs[0].getNext())!=null){
 			bag.add(t);
 		}
-		iter = bag.content();
+		iter = bag.iterator();
 		return true;
 	}
 	
 	@Override
 	public Tuple getNext() throws IOException {
 		if (iter.hasNext())
-			return (Tuple)iter.next();
+			return iter.next();
 		else
 			return null;
 	}
+
+    @Override
+    public void visit(POVisitor v) {
+        v.visitSort(this);
+    }
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitMaster.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitMaster.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitMaster.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitMaster.java Tue Jan 22 13:17:12 2008
@@ -109,4 +109,9 @@
         }
     }
 	*/
+
+    public void visit(POVisitor v) {
+        v.visitSplitMaster(this);
+    }
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitSlave.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitSlave.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitSlave.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitSlave.java Tue Jan 22 13:17:12 2008
@@ -42,4 +42,8 @@
         return master.slaveGetNext(this);
     }
 
+    public void visit(POVisitor v) {
+        v.visitSplit(this);
+    }
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStore.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStore.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStore.java Tue Jan 22 13:17:12 2008
@@ -21,8 +21,8 @@
 
 import org.apache.pig.StoreFunc;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.Datum;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.io.PigFile;
@@ -63,7 +63,7 @@
     @Override
 	public Tuple getNext() throws IOException {
         // get all tuples from input, and store them.
-        DataBag b = new DataBag(Datum.DataType.TUPLE);
+        DataBag b = BagFactory.getInstance().newDefaultBag();
         Tuple t;
         while ((t = (Tuple) inputs[0].getNext()) != null) {
             b.add(t);
@@ -88,6 +88,11 @@
     	new RuntimeException().printStackTrace();
     	System.exit(1);
     	return -1;
+    }
+
+    @Override
+    public void visit(POVisitor v) {
+        v.visitStore(this);
     }
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POUnion.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POUnion.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POUnion.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POUnion.java Tue Jan 22 13:17:12 2008
@@ -72,4 +72,8 @@
         return null;
     }
 
+    public void visit(POVisitor v) {
+        v.visitUnion(this);
+    }
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java Tue Jan 22 13:17:12 2008
@@ -71,4 +71,6 @@
     public int getOutputType(){
     	return outputType;
     }
+
+    public abstract void visit(POVisitor v);
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java Tue Jan 22 13:17:12 2008
@@ -21,7 +21,7 @@
 import java.util.Map;
 
 import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Datum;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 
@@ -34,7 +34,7 @@
     }
 
     public DataBag exec(boolean continueFromLast) throws IOException {
-        DataBag results = new DataBag(Datum.DataType.TUPLE);
+        DataBag results = BagFactory.getInstance().newDefaultBag();
 
         root.open(continueFromLast);
         Tuple t;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/DataBuffer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/DataBuffer.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/util/DataBuffer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/util/DataBuffer.java Tue Jan 22 13:17:12 2008
@@ -19,7 +19,6 @@
 
 import java.util.*;
 
-import org.apache.pig.data.Datum;
 import org.apache.pig.impl.eval.collector.DataCollector;
 
 
@@ -30,14 +29,14 @@
 		super(null);
 	}
 	
-    List<Datum> buf = Collections.synchronizedList(new LinkedList<Datum>());
+    List<Object> buf = Collections.synchronizedList(new LinkedList<Object>());
 
     @Override
-	public void add(Datum d){
+	public void add(Object d){
         if (d != null) buf.add(d);
     }
     
-    public Datum removeFirst(){
+    public Object removeFirst(){
         if (buf.isEmpty())
         	return null;
         else
@@ -48,8 +47,8 @@
      * This is a sequence we want to do frequently to accomodate the simple eval case, i.e., cases
      * where we know that running an eval spec one item should produce one and only one item.
      */
-    public Datum removeFirstAndAssertEmpty(){
-    	Datum d;
+    public Object removeFirstAndAssertEmpty(){
+    	Object d;
     	if (isStale() || (d = removeFirst()) == null){
     		throw new RuntimeException("Simple eval used but buffer found to be empty or stale");
     	}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java Tue Jan 22 13:17:12 2008
@@ -168,17 +168,29 @@
      */
     private static void mergeJar(JarOutputStream jarFile, String jar, String prefix, Map<String, String> contents)
             throws FileNotFoundException, IOException {
-        JarInputStream jis = new JarInputStream(new FileInputStream(jar));
+        JarInputStream jarInput = new JarInputStream(new FileInputStream(jar));
+        
+        mergeJar(jarFile, jarInput, prefix, contents);
+    }
+    
+    private static void mergeJar(JarOutputStream jarFile, URL jar, String prefix, Map<String, String> contents)
+    throws FileNotFoundException, IOException {
+        JarInputStream jarInput = new JarInputStream(jar.openStream());
+
+        mergeJar(jarFile, jarInput, prefix, contents);
+    }
+
+    private static void mergeJar(JarOutputStream jarFile, JarInputStream jarInput, String prefix, Map<String, String> contents)
+    throws FileNotFoundException, IOException {
         JarEntry entry;
-        while ((entry = jis.getNextJarEntry()) != null) {
+        while ((entry = jarInput.getNextJarEntry()) != null) {
             if (prefix != null && !entry.getName().startsWith(prefix)) {
                 continue;
             }
-            addStream(jarFile, entry.getName(), jis, contents);
+            addStream(jarFile, entry.getName(), jarInput, contents);
         }
     }
-    
-    /**
+        /**
      * Adds a stream to a Jar file.
      * 
      * @param os

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/PigLogger.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/PigLogger.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/util/PigLogger.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/util/PigLogger.java Tue Jan 22 13:17:12 2008
@@ -36,7 +36,6 @@
 {
 	if (mLogger == null) {
 		mLogger = Logger.getLogger("org.apache.pig");
-		mLogger.setAdditivity(false);
 	}
 	return mLogger;
 }

Propchange: incubator/pig/branches/types/src/org/apache/pig/tools/grunt/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Jan 22 13:17:12 2008
@@ -0,0 +1,7 @@
+
+TokenMgrError.java
+Token.java
+SimpleCharStream.java
+ParseException.java
+GruntParserTokenManager.java
+GruntParserConstants.java

Propchange: incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Jan 22 13:17:12 2008
@@ -0,0 +1,8 @@
+
+TokenMgrError.java
+Token.java
+SimpleCharStream.java
+PigScriptParserTokenManager.java
+PigScriptParserConstants.java
+PigScriptParser.java
+ParseException.java

Propchange: incubator/pig/branches/types/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Jan 22 13:17:12 2008
@@ -0,0 +1,2 @@
+
+reports

Added: incubator/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java?rev=614325&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java Tue Jan 22 13:17:12 2008
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import org.apache.pig.data.*;
+import org.apache.pig.impl.eval.EvalSpec;
+
+// Test data bag factory, for testing that we can propery provide a non
+// default bag factory.
+public class NonDefaultBagFactory extends BagFactory {
+    public DataBag newDefaultBag() { return null; }
+    public DataBag newSortedBag(EvalSpec sortSpec) { return null; }
+    public DataBag newDistinctBag() { return null; }
+
+    public NonDefaultBagFactory() { super(); }
+}
+

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java Tue Jan 22 13:17:12 2008
@@ -33,6 +33,36 @@
 public class TestAlgebraicEval extends TestCase {
     
 	private String initString = "mapreduce";
+    
+    @Test
+    public void testGroupCountWithMultipleFields() throws Exception {
+        int LOOP_COUNT = 1024;
+        PigServer pig = new PigServer(initString);
+        File tmpFile = File.createTempFile("test", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        for(int i = 0; i < LOOP_COUNT; i++) {
+            for(int j=0; j< LOOP_COUNT; j++) {
+                ps.println(i + "\t" + i + "\t" + j%2);
+            }
+        }
+        ps.close();
+        String query = "myid = foreach (group (load 'file:" + tmpFile + "') all) generate group, COUNT($1) ;";
+        System.out.println(query);
+        pig.registerQuery(" a = group (load 'file:" + tmpFile + "') by ($0,$1);");
+        pig.registerQuery("b = foreach a generate flatten(group), SUM($1.$2);");
+        Iterator<Tuple> it = pig.openIterator("b");
+        tmpFile.delete();
+        int count = 0;
+        while(it.hasNext()){
+            int sum = it.next().getAtomField(2).numval().intValue();
+            assertEquals(LOOP_COUNT/2, sum);
+            count++;
+        }
+        assertEquals(count, LOOP_COUNT);
+    }
+    
+    
+    
     @Test
     public void testSimpleCount() throws Exception {
         int LOOP_COUNT = 1024;
@@ -72,6 +102,8 @@
         Double count = t.getAtomField(1).numval();
         assertEquals(count, (double)LOOP_COUNT);
     }
+    
+    
     
     @Test
     public void testGroupReorderCount() throws Exception {

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java Tue Jan 22 13:17:12 2008
@@ -62,6 +62,48 @@
     }
 
     @Test
+    public void testAVGInitial() throws Exception {
+        int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+        EvalFunc<Tuple> avg = new AVG.Initial();
+        Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+        Tuple output = new Tuple();
+        avg.exec(tup, output);
+
+        assertEquals("Expected sum to be 55.0", 55.0,
+            output.getAtomField(0).numval());
+        assertEquals("Expected count to be 10", 10,
+            output.getAtomField(1).longVal());
+    }
+
+    @Test
+    public void testAVGFinal() throws Exception {
+        Tuple t1 = new Tuple(2);
+        t1.setField(0, 55.0);
+        t1.setField(1, 10);
+        Tuple t2 = new Tuple(2);
+        t2.setField(0, 28.0);
+        t2.setField(1, 7);
+        Tuple t3 = new Tuple(2);
+        t3.setField(0, 82.0);
+        t3.setField(1, 17);
+        DataBag bag = BagFactory.getInstance().newDefaultBag();
+        bag.add(t1);
+        bag.add(t2);
+        bag.add(t3);
+        
+        Tuple tup = new Tuple(bag);
+
+        EvalFunc<DataAtom> avg = new AVG.Final();
+        DataAtom output = new DataAtom();
+        avg.exec(tup, output);
+
+        assertEquals("Expected average to be 4.852941176470588",
+            4.852941176470588, output.numval());
+    }
+
+
+    @Test
     public void testCOUNT() throws Exception {
         int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
         double expected = input.length;
@@ -91,7 +133,7 @@
         count.exec(tup,output);
         assertTrue(output.numval() == 0);
         
-        map.put("a", "a");
+        map.put("a", new DataAtom("a"));
 
         assertFalse(isEmpty.exec(tup));
         count.exec(tup,output);
@@ -105,7 +147,32 @@
         assertTrue(output.numval() == 2);
         
     }
-    
+
+    @Test
+    public void testCOUNTInitial() throws Exception {
+        int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+        EvalFunc<Tuple> count = new COUNT.Initial();
+        Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+        Tuple output = new Tuple();
+        count.exec(tup, output);
+
+        assertEquals("Expected count to be 10", 10,
+            output.getAtomField(0).longVal());
+    }
+
+    @Test
+    public void testCOUNTFinal() throws Exception {
+        int input[] = { 23, 38, 39 };
+        Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+
+        EvalFunc<DataAtom> count = new COUNT.Final();
+        DataAtom output = new DataAtom();
+        count.exec(tup, output);
+
+        assertEquals("Expected count to be 100", 100, output.longVal());
+    }
+
     @Test
     public void testSUM() throws Exception {
         int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
@@ -121,6 +188,108 @@
         assertTrue(actual == expected);
     }
 
+    @Test
+    public void testSUMInitial() throws Exception {
+        int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+        EvalFunc<Tuple> sum = new SUM.Initial();
+        Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+        Tuple output = new Tuple();
+        sum.exec(tup, output);
+
+        assertEquals("Expected sum to be 55.0", 55.0,
+            output.getAtomField(0).numval());
+    }
+
+    @Test
+    public void testSUMFinal() throws Exception {
+        int input[] = { 23, 38, 39 };
+        Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+
+        EvalFunc<DataAtom> sum = new SUM.Final();
+        DataAtom output = new DataAtom();
+        sum.exec(tup, output);
+
+        assertEquals("Expected sum to be 100.0", 100.0, output.numval());
+    }
+
+    @Test
+    public void testMIN() throws Exception {
+        int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+        EvalFunc<DataAtom> min = new MIN();
+        Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+        DataAtom output = new DataAtom();
+        min.exec(tup, output);
+
+        assertEquals("Expected min to be 1.0", 1.0, output.numval());
+    }
+
+
+    @Test
+    public void testMINInitial() throws Exception {
+        int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+        EvalFunc<Tuple> min = new MIN.Initial();
+        Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+        Tuple output = new Tuple();
+        min.exec(tup, output);
+
+        assertEquals("Expected min to be 1.0", 1.0,
+            output.getAtomField(0).numval());
+    }
+
+    @Test
+    public void testMINFinal() throws Exception {
+        int input[] = { 23, 38, 39 };
+        Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+
+        EvalFunc<DataAtom> min = new MIN.Final();
+        DataAtom output = new DataAtom();
+        min.exec(tup, output);
+
+        assertEquals("Expected sum to be 23.0", 23.0, output.numval());
+    }
+
+    @Test
+    public void testMAX() throws Exception {
+        int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+        EvalFunc<DataAtom> max = new MAX();
+        Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+        DataAtom output = new DataAtom();
+        max.exec(tup, output);
+
+        assertEquals("Expected max to be 10.0", 10.0, output.numval());
+    }
+
+
+    @Test
+    public void testMAXInitial() throws Exception {
+        int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+        EvalFunc<Tuple> max = new MAX.Initial();
+        Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+        Tuple output = new Tuple();
+        max.exec(tup, output);
+
+        assertEquals("Expected max to be 10.0", 10.0,
+            output.getAtomField(0).numval());
+    }
+
+    @Test
+    public void testMAXFinal() throws Exception {
+        int input[] = { 23, 38, 39 };
+        Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+
+        EvalFunc<DataAtom> max = new MAX.Final();
+        DataAtom output = new DataAtom();
+        max.exec(tup, output);
+
+        assertEquals("Expected sum to be 39.0", 39.0, output.numval());
+    }
+
+
     // Builtin APPLY Functions
     // ========================
 
@@ -159,6 +328,7 @@
         assertTrue(f3.arity() == arity3);
     }
 
+    /*
     @Test
     public void testLFBin() throws Exception {
 
@@ -172,8 +342,7 @@
         t2.setField(0,a);
         Tuple t3 = new Tuple(1);
         t3.setField(0, b);
-        DataBag bag =
-			BagFactory.getInstance().getNewBigBag(Datum.DataType.TUPLE);
+        DataBag bag = BagFactory.getInstance().getNewBigBag();
         bag.add(t2);
         bag.add(t3);
         Tuple t4 = new Tuple(2);
@@ -192,8 +361,7 @@
         t6.setField(0,c);
         Tuple t7 = new Tuple(1);
         t7.setField(0, d);
-        DataBag bag2 =
-			BagFactory.getInstance().getNewBigBag(Datum.DataType.TUPLE);    
+        DataBag bag2 = BagFactory.getInstance().getNewBigBag();    
         for(int i = 0; i < 10; i ++) {
             bag2.add(t6);
             bag2.add(t7);
@@ -224,6 +392,7 @@
         assertTrue(r1.equals(t1));
         assertTrue(r2.equals(t5));
     }
+    */
 
     
     @Test
@@ -316,12 +485,8 @@
     	
     	for (int i=0; i< numTimes; i++){
     		Tuple t = iter.next();
-    		
-			Tuple t0 = (Tuple)t.getBagField(0).content().next();
-			Tuple t1 = (Tuple)t.getBagField(1).content().next();
-    		assertEquals(i+"AA", t0.getAtomField(0).strval());
-    		assertEquals(i+"BB", t1.getAtomField(0).strval());
-    		
+    		assertEquals(i+"AA", t.getBagField(0).iterator().next().getAtomField(0).strval());
+    		assertEquals(i+"BB", t.getBagField(1).iterator().next().getAtomField(0).strval());
     	}
     	
     	assertFalse(iter.hasNext());

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java Tue Jan 22 13:17:12 2008
@@ -17,698 +17,728 @@
  */
 package org.apache.pig.test;
 
-import java.io.DataInput;
-import java.io.DataOutput;
+/*
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileInputStream;
 import java.io.IOException;
-
-import java.util.List;
-import java.util.ArrayList;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
 import java.util.Iterator;
+import java.util.Random;
+*/
+
+import java.util.*;
+import java.io.IOException;
 
 import org.junit.Test;
 
 import org.apache.pig.data.*;
+import org.apache.pig.impl.eval.*;
+import org.apache.pig.impl.util.Spillable;
 
 /**
- * This class will exercise the data bag data type.
+ * This class will exercise the basic Pig data model and members. It tests for proper behavior in
+ * assigment and comparision, as well as function application.
  * 
- * @author gates
+ * @author dnm
  */
-public class TestDataBag extends junit.framework.TestCase
-{
-
-public void testDefaultConstructor() throws Exception
-{
-	DataBag bag = new DataBag(Datum.DataType.INT);
-
-	assertEquals("getType", Datum.DataType.BAG, bag.getType());
-	assertFalse("is null", bag.isNull());
-	assertTrue("bag of ints", bag.bagOf() == Datum.DataType.INT);
-
-	assertEquals("Default constructor size before", 0, bag.size());
-	DataInteger val = new DataInteger(42);
-
-	bag.add(val);
-	assertEquals("Default constructor size after", 1, bag.size());
-
-	Iterator<Datum> i = bag.content();
-	Datum d = i.next();
-
-	assertTrue("should be an integer", d.getType() == Datum.DataType.INT);
-	assertNotNull("get with entry in bag", d);
-	assertEquals("value of val", 42, ((DataInteger)d).get());
-}
-
-public void testListConstructor() throws Exception
-{
-	List<Datum> list = new ArrayList<Datum>();
-	list.add(new DataInteger(10));
-	list.add(new DataInteger(11));
-	list.add(new DataInteger(9));
-
-	DataBag bag = new DataBag(list);
-
-	assertEquals("list construct size", 3L, bag.size());
-
-	Iterator<Datum> i = bag.content();
-	Datum d = i.next();
-	assertNotNull("get first entry in bag", d);
-	assertTrue("should be an integer", d.getType() == Datum.DataType.INT);
-	assertEquals("first value of val", 10, ((DataInteger)d).get());
-	d = i.next();
-	assertNotNull("get second entry in bag", d);
-	assertTrue("should be an integer", d.getType() == Datum.DataType.INT);
-	assertEquals("second value of val", 11, ((DataInteger)d).get());
-	d = i.next();
-	assertNotNull("get third entry in bag", d);
-	assertTrue("should be an integer", d.getType() == Datum.DataType.INT);
-	assertEquals("third value of val", 9, ((DataInteger)d).get());
-	assertFalse("bag should be exhausted now", i.hasNext());
-
-	bag.add(new DataInteger(4));
-	i = bag.content();
-	d = i.next();
-	d = i.next();
-	d = i.next();
-	d = i.next();
-	assertNotNull("get fourth entry in bag", d);
-	assertTrue("should be an integer", d.getType() == Datum.DataType.INT);
-	assertEquals("fourth value of val", 4, ((DataInteger)d).get());
-	assertFalse("bag should be exhausted now", i.hasNext());
-}
-
-
-public void testBigBag() throws Exception
-{
-	DataBag bag = new DataBag(Datum.DataType.INT);
-
-	for (int i = 0; i < 10000; i++) {
-		bag.add(new DataInteger(i));
-	}
-
-	assertEquals("big size after loading", 10000, bag.size());
-
-	Iterator<Datum> i = bag.content();
-	for (int j = 0; j < 10000; j++) {
-		assertTrue("should still have data", i.hasNext());
-		Datum val = i.next();
-		assertTrue("should be an integer", val.getType() == Datum.DataType.INT);
-		assertEquals("value of val", j, ((DataInteger)val).get());
-	}
-	assertFalse("bag should be exhausted now", i.hasNext());
-}
-
-public void testToString() throws Exception
-{
-	DataBag bag = new DataBag(Datum.DataType.INT);
-
-	bag.add(new DataInteger(1));
-	bag.add(new DataInteger(1));
-	bag.add(new DataInteger(3));
-
-	assertEquals("toString", "{1, 1, 3}", bag.toString());
-}
-
-public void testEquals() throws Exception
-{
-	DataBag bag1 = new DataBag(Datum.DataType.INT);
-	DataBag bag2 = new DataBag(Datum.DataType.INT);
-
-	bag1.add(new DataInteger(3));
-	bag2.add(new DataInteger(3));
-
-	assertFalse("different object", bag1.equals(new String()));
-
-	assertTrue("same data", bag1.equals(bag2));
-
-	bag2 = new DataBag(Datum.DataType.INT);
-	bag2.add(new DataInteger(4));
-	assertFalse("different data", bag1.equals(bag2));
-
-	bag2 = new DataBag(Datum.DataType.INT);
-	bag2.add(new DataInteger(3));
-	bag2.add(new DataInteger(3));
-	assertFalse("different size", bag1.equals(bag2));
-
-	bag2 = new DataBag(Datum.DataType.LONG);
-	bag2.add(new DataLong(3));
-	assertFalse("different type of bag", bag1.equals(bag2));
-}
-
-public void testCompareTo() throws Exception
-{
-	DataBag bag1 = new DataBag(Datum.DataType.INT);
-	DataBag bag2 = new DataBag(Datum.DataType.INT);
-
-	bag1.add(new DataInteger(3));
-	bag2.add(new DataInteger(3));
-
-	assertEquals("different object less than", -1, bag1.compareTo(new String()));
-
-	Tuple t = new Tuple();
-	assertTrue("less than tuple", bag1.compareTo(t) < 0);
-	DataMap map = new DataMap();
-	assertTrue("less than map", bag1.compareTo(map) < 0);
-	DataLong l = new DataLong();
-	assertTrue("less than long", bag1.compareTo(l) < 0);
-	DataFloat f = new DataFloat();
-	assertTrue("less than float", bag1.compareTo(f) < 0);
-	DataDouble d = new DataDouble();
-	assertTrue("less than double", bag1.compareTo(d) < 0);
-	DataUnknown unk = new DataUnknown();
-	assertTrue("less than unknown", bag1.compareTo(unk) < 0);
-	DataCharArrayUtf16 utf16 = new DataCharArrayUtf16();
-	assertTrue("less than utf16", bag1.compareTo(utf16) < 0);
-
-	assertEquals("same data equal", 0,  bag1.compareTo(bag2));
-
-	bag2 = new DataBag(Datum.DataType.INT);
-	bag2.add(new DataInteger(2));
-	assertEquals("greater than bag with lesser value", 1, bag1.compareTo(bag2));
-
-	bag2 = new DataBag(Datum.DataType.INT);
-	bag2.add(new DataInteger(4));
-	assertEquals("less than bag with greater value", -1, bag1.compareTo(bag2));
-
-	bag2 = new DataBag(Datum.DataType.INT);
-	bag2.add(new DataInteger(3));
-	bag2.add(new DataInteger(4));
-	assertEquals("less than bigger bag", -1, bag1.compareTo(bag2));
-
-	bag2 = new DataBag(Datum.DataType.INT);
-	assertEquals("greater than smaller bag", 1, bag1.compareTo(bag2));
-
-	bag2 = new DataBag(Datum.DataType.LONG);
-	bag2.add(new DataLong(3));
-	assertEquals("different type of bag", -1, bag1.compareTo(bag2));
-}
-
-
-public void testWriteReadUnknown() throws Exception
-{
-	DataBag before = new DataBag(Datum.DataType.UNKNOWN);
-
-	String s = new String("zzz");
-	before.add(new DataUnknown(s.getBytes()));
-	s = new String("yyy");
-	before.add(new DataUnknown(s.getBytes()));
-	s = new String("xxx");
-	before.add(new DataUnknown(s.getBytes()));
-
-	File file = null;
-	file = File.createTempFile("DataBagUnknown", "put");
-	FileOutputStream fos = new FileOutputStream(file);
-	DataOutput out = new DataOutputStream(fos);
-	before.write(out);
-	fos.close();
-
-	FileInputStream fis = new FileInputStream(file);
-	DataInput in = new DataInputStream(fis);
-	Datum a = DatumImpl.readDatum(in);
-
-	assertTrue("isa DataBag", a instanceof DataBag);
-
-	DataBag after = (DataBag)a;
-
-	assertTrue("bag of unknowns", after.bagOf() == Datum.DataType.UNKNOWN);
-	assertEquals("after read, size", 3, after.size()); 
-
-	Iterator<Datum> j = after.content();
-
-	Datum valAfter = j.next();
-	assertTrue("should be an unknown",
-		valAfter.getType() == Datum.DataType.UNKNOWN);
-	for (int i = 0; i < 3; i++) {
-		assertEquals("value of valAfter", (byte)0x7a,
-			((DataUnknown)valAfter).get()[i]);
-	}
-
-	valAfter = j.next();
-	assertTrue("should be an unknown",
-		valAfter.getType() == Datum.DataType.UNKNOWN);
-	for (int i = 0; i < 3; i++) {
-		assertEquals("value of valAfter", (byte)0x79,
-			((DataUnknown)valAfter).get()[i]);
-	}
-
-	valAfter = j.next();
-	assertTrue("should be an unknown",
-		valAfter.getType() == Datum.DataType.UNKNOWN);
-	for (int i = 0; i < 3; i++) {
-		assertEquals("value of valAfter", (byte)0x78,
-			((DataUnknown)valAfter).get()[i]);
-	}
-
-	assertFalse("should have read all values in bag", j.hasNext());
-	
-	file.delete();
-}
-
-public void testWriteReadInt() throws Exception
-{
-	DataBag before = new DataBag(Datum.DataType.INT);
-
-	before.add(new DataInteger(99));
-	before.add(new DataInteger(-98));
-	before.add(new DataInteger(97));
-
-	File file = null;
-	file = File.createTempFile("DataBagInteger", "put");
-	FileOutputStream fos = new FileOutputStream(file);
-	DataOutput out = new DataOutputStream(fos);
-	before.write(out);
-	fos.close();
-
-	FileInputStream fis = new FileInputStream(file);
-	DataInput in = new DataInputStream(fis);
-	Datum a = DatumImpl.readDatum(in);
-
-	assertTrue("isa DataBag", a instanceof DataBag);
-
-	DataBag after = (DataBag)a;
-
-	assertTrue("bag of ints", after.bagOf() == Datum.DataType.INT);
-
-	assertEquals("after read, size", 3, after.size()); 
-
-	Iterator<Datum> j = after.content();
-
-	Datum val = j.next();
-	assertTrue("should be an integer", val.getType() == Datum.DataType.INT);
-	assertEquals("value of valAfter", 99, ((DataInteger)val).get());
-
-	val = j.next();
-	assertTrue("should be an integer", val.getType() == Datum.DataType.INT);
-	assertEquals("value of valAfter2", -98, ((DataInteger)val).get());
-
-	val = j.next();
-	assertTrue("should be an integer", val.getType() == Datum.DataType.INT);
-	assertEquals("value of valAfter", 97, ((DataInteger)val).get());
-
-	assertFalse("should have read all values in bag", j.hasNext());
-		
-	file.delete();
-}
-
-public void testWriteReadLong() throws Exception
-{
-	DataBag before = new DataBag(Datum.DataType.LONG);
-
-	before.add(new DataLong(99000000000L));
-	before.add(new DataLong(-98L));
-	before.add(new DataLong(97L));
-
-	File file = null;
-	file = File.createTempFile("DataBagLong", "put");
-	FileOutputStream fos = new FileOutputStream(file);
-	DataOutput out = new DataOutputStream(fos);
-	before.write(out);
-	fos.close();
-
-	FileInputStream fis = new FileInputStream(file);
-	DataInput in = new DataInputStream(fis);
-	Datum a = DatumImpl.readDatum(in);
-
-	assertTrue("isa DataBag", a instanceof DataBag);
-
-	DataBag after = (DataBag)a;
-
-	assertTrue("bag of longs", after.bagOf() == Datum.DataType.LONG);
-	assertEquals("after read, size", 3, after.size()); 
-
-	Iterator<Datum> j = after.content();
-
-	Datum val = j.next();
-	assertTrue("should be a long", val.getType() == Datum.DataType.LONG);
-	assertEquals("value of valAfter", 99000000000L, ((DataLong)val).get());
-
-	val = j.next();
-	assertTrue("should be a long", val.getType() == Datum.DataType.LONG);
-	assertEquals("value of valAfter2", -98L, ((DataLong)val).get());
-
-	val = j.next();
-	assertTrue("should be a long", val.getType() == Datum.DataType.LONG);
-	assertEquals("value of valAfter", 97L, ((DataLong)val).get());
-
-	assertFalse("should have read all values in bag", j.hasNext());
-		
-	file.delete();
-}
-
-public void testWriteReadFloat() throws Exception
-{
-	DataBag before = new DataBag(Datum.DataType.FLOAT);
-
-	before.add(new DataFloat(3.2e32f));
-	before.add(new DataFloat(-9.929292e-29f));
-	before.add(new DataFloat(97.0f));
-
-	File file = null;
-	file = File.createTempFile("DataBagFloat", "put");
-	FileOutputStream fos = new FileOutputStream(file);
-	DataOutput out = new DataOutputStream(fos);
-	before.write(out);
-	fos.close();
-
-	FileInputStream fis = new FileInputStream(file);
-	DataInput in = new DataInputStream(fis);
-	Datum a = DatumImpl.readDatum(in);
-
-	assertTrue("isa DataBag", a instanceof DataBag);
-
-	DataBag after = (DataBag)a;
-
-	assertTrue("bag of floats", after.bagOf() == Datum.DataType.FLOAT);
-	assertEquals("after read, size", 3, after.size()); 
-
-	Iterator<Datum> j = after.content();
-
-	Datum val = j.next();
-	assertTrue("should be a float", val.getType() == Datum.DataType.FLOAT);
-	assertEquals("value of valAfter", 3.2e32f, ((DataFloat)val).get());
+public class TestDataBag extends junit.framework.TestCase {
 
-	val = j.next();
-	assertTrue("should be a float", val.getType() == Datum.DataType.FLOAT);
-	assertEquals("value of valAfter2", -9.929292e-29f, ((DataFloat)val).get());
+    private Random rand = new Random();
 
-	val = j.next();
-	assertTrue("should be a float", val.getType() == Datum.DataType.FLOAT);
-	assertEquals("value of valAfter", 97.0f, ((DataFloat)val).get());
+    private class TestMemoryManager {
+        ArrayList<Spillable> mManagedObjects = new ArrayList<Spillable>();
 
-	assertFalse("should have read all values in bag", j.hasNext());
-		
-	file.delete();
-}
-
-public void testWriteReadDouble() throws Exception
-{
-	DataBag before = new DataBag(Datum.DataType.DOUBLE);
-
-	before.add(new DataDouble(3.2e132));
-	before.add(new DataDouble(-9.929292e-129));
-	before.add(new DataDouble(97.0));
-
-	File file = null;
-	file = File.createTempFile("DataBagDouble", "put");
-	FileOutputStream fos = new FileOutputStream(file);
-	DataOutput out = new DataOutputStream(fos);
-	before.write(out);
-	fos.close();
-
-	FileInputStream fis = new FileInputStream(file);
-	DataInput in = new DataInputStream(fis);
-	Datum a = DatumImpl.readDatum(in);
-
-	assertTrue("isa DataBag", a instanceof DataBag);
-
-	DataBag after = (DataBag)a;
-
-	assertTrue("bag of double", after.bagOf() == Datum.DataType.DOUBLE);
-	assertEquals("after read, size", 3, after.size()); 
-
-	Iterator<Datum> j = after.content();
-
-	Datum val = j.next();
-	assertTrue("should be a double", val.getType() == Datum.DataType.DOUBLE);
-	assertEquals("value of valAfter", 3.2e132, ((DataDouble)val).get());
-
-	val = j.next();
-	assertTrue("should be a double", val.getType() == Datum.DataType.DOUBLE);
-	assertEquals("value of valAfter2", -9.929292e-129, ((DataDouble)val).get());
-
-	val = j.next();
-	assertTrue("should be a double", val.getType() == Datum.DataType.DOUBLE);
-	assertEquals("value of valAfter", 97.0, ((DataDouble)val).get());
-
-	assertFalse("should have read all values in bag", j.hasNext());
-		
-	file.delete();
-}
-
-public void testWriteReadUtf16() throws Exception
-{
-	DataBag before = new DataBag(Datum.DataType.CHARARRAY);
-
-	before.add(new DataCharArrayUtf16("zzz"));
-	before.add(new DataCharArrayUtf16("yyy"));
-	before.add(new DataCharArrayUtf16("xxx"));
-
-	File file = null;
-	file = File.createTempFile("DataBagUtf16", "put");
-	FileOutputStream fos = new FileOutputStream(file);
-	DataOutput out = new DataOutputStream(fos);
-	before.write(out);
-	fos.close();
-
-	FileInputStream fis = new FileInputStream(file);
-	DataInput in = new DataInputStream(fis);
-	Datum a = DatumImpl.readDatum(in);
-
-	assertTrue("isa DataBag", a instanceof DataBag);
-
-	DataBag after = (DataBag)a;
-
-	assertTrue("bag of chararray", after.bagOf() == Datum.DataType.CHARARRAY);
-	assertEquals("after read, size", 3, after.size()); 
-
-	Iterator<Datum> j = after.content();
-
-	Datum val = j.next();
-	assertTrue("should be a chararray", val.getType() == Datum.DataType.CHARARRAY);
-	assertTrue("encoding should be utf16",
-		((DataCharArray)val).getEncoding() == DataCharArray.Encoding.UTF16);
-	assertEquals("value of valAfter", "zzz", ((DataCharArrayUtf16)val).get());
-
-	val = j.next();
-	assertTrue("should be a chararray", val.getType() == Datum.DataType.CHARARRAY);
-	assertTrue("encoding should be utf16",
-		((DataCharArray)val).getEncoding() == DataCharArray.Encoding.UTF16);
-	assertEquals("value of valAfter2", "yyy", ((DataCharArrayUtf16)val).get());
-
-	val = j.next();
-	assertTrue("should be a chararray", val.getType() == Datum.DataType.CHARARRAY);
-	assertTrue("encoding should be utf16",
-		((DataCharArray)val).getEncoding() == DataCharArray.Encoding.UTF16);
-	assertEquals("value of valAfter", "xxx", ((DataCharArrayUtf16)val).get());
-
-	assertFalse("should have read all values in bag", j.hasNext());
-		
-	file.delete();
-}
+        public void register(Spillable s) {
+            mManagedObjects.add(s);
+        }
+
+        public void forceSpill() throws IOException {
+            Iterator<Spillable> i = mManagedObjects.iterator();
+            while (i.hasNext()) i.next().spill();
+        }
+    }
+
+    // Need to override the regular bag factory so I can register with my local
+    // memory manager.
+    private class LocalBagFactory {
+        TestMemoryManager mMemMgr;
+
+        public LocalBagFactory(TestMemoryManager mgr) {
+            mMemMgr = mgr;
+        }
+
+        public DataBag newDefaultBag() {
+            DataBag bag = new DefaultDataBag();
+            mMemMgr.register(bag);
+            return bag;
+        }
+
+        public DataBag newSortedBag(EvalSpec sortSpec) {
+            DataBag bag = new SortedDataBag(sortSpec);
+            mMemMgr.register(bag);
+            return bag;
+        }
+
+        public DataBag newDistinctBag() {
+            DataBag bag = new DistinctDataBag();
+            mMemMgr.register(bag);
+            return bag;
+        }
+    }
+
+    // Test reading and writing default from memory, no spills.
+    @Test
+    public void testDefaultInMemory() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDefaultBag();
+        ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(10);
+
+        // Write tuples into both
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(i));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading and writing default from file with one spill
+    @Test
+    public void testDefaultSingleSpill() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDefaultBag();
+        ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(10);
+
+        // Write tuples into both
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(i));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+        mgr.forceSpill();
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading and writing default from file with three spills
+    @Test
+    public void testDefaultTripleSpill() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDefaultBag();
+        ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(30);
+
+        // Write tuples into both
+        for (int j = 0; j < 3; j++) {
+            for (int i = 0; i < 10; i++) {
+                Tuple t = new Tuple(new DataAtom(i));
+                b.add(t);
+                rightAnswer.add(t);
+            }
+            mgr.forceSpill();
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+    // Test reading with some in file, some in memory.
+    @Test
+    public void testDefaultInMemInFile() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDefaultBag();
+        ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(20);
+
+        // Write tuples into both
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(i));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+        mgr.forceSpill();
+
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(i));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading with a spill happening in the middle of the read.
+    @Test
+    public void testDefaultSpillDuringRead() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDefaultBag();
+        ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(20);
+
+        // Write tuples into both
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(i));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+        mgr.forceSpill();
+
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(i));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        for (int i = 0; i < 15; i++) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        mgr.forceSpill();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+    // Test reading and writing sorted from memory, no spills.
+    @Test
+    public void testSortedInMemory() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newSortedBag(null);
+        PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(10);
+
+        // Write tuples into both
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+
+        Tuple t;
+        while ((t = rightAnswer.poll()) != null) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), t);
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading and writing default from file with one spill
+    @Test
+    public void testSortedSingleSpill() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newSortedBag(null);
+        PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(10);
+
+        // Write tuples into both
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+        mgr.forceSpill();
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Tuple t;
+        while ((t = rightAnswer.poll()) != null) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), t);
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading and writing default from file with three spills
+    @Test
+    public void testSortedTripleSpill() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newSortedBag(null);
+        PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(30);
+
+        // Write tuples into both
+        for (int j = 0; j < 3; j++) {
+            for (int i = 0; i < 10; i++) {
+                Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+                b.add(t);
+                rightAnswer.add(t);
+            }
+            mgr.forceSpill();
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+
+        Tuple t;
+        while ((t = rightAnswer.poll()) != null) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), t);
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+    // Test reading with some in file, some in memory.
+    @Test
+    public void testSortedInMemInFile() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newSortedBag(null);
+        PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(20);
+
+        // Write tuples into both
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+        mgr.forceSpill();
+
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Tuple t;
+        while ((t = rightAnswer.poll()) != null) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), t);
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+    // Test reading with a spill happening in the middle of the read.
+    @Test
+    public void testSortedSpillDuringRead() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newSortedBag(null);
+        PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(20);
+
+        // Write tuples into both
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+        mgr.forceSpill();
+
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+
+        for (int i = 0; i < 15; i++) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rightAnswer.poll());
+        }
+
+        mgr.forceSpill();
+
+        Tuple t;
+        while ((t = rightAnswer.poll()) != null) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), t);
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+    // Test reading with first spill happening in the middle of the read.
+    @Test
+    public void testSortedFirstSpillDuringRead() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newSortedBag(null);
+        PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(20);
+
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+
+        for (int i = 0; i < 5; i++) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rightAnswer.poll());
+        }
+
+        mgr.forceSpill();
+
+        Tuple t;
+        while ((t = rightAnswer.poll()) != null) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), t);
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading and writing sorted file with so many spills it requires
+   // premerge.
+    @Test
+    public void testSortedPreMerge() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newSortedBag(null);
+        PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(30);
+
+        // Write tuples into both
+        for (int j = 0; j < 373; j++) {
+            for (int i = 0; i < 10; i++) {
+                Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+                b.add(t);
+                rightAnswer.add(t);
+            }
+            mgr.forceSpill();
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+
+        Tuple t;
+        while ((t = rightAnswer.poll()) != null) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), t);
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+    // Test reading and writing distinct from memory, no spills.
+    @Test
+    public void testDistinctInMemory() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDistinctBag();
+        TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+        // Write tuples into both
+        for (int i = 0; i < 50; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading and writing distinct from file with one spill
+    @Test
+    public void testDistinctSingleSpill() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDistinctBag();
+        TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+        // Write tuples into both
+        for (int i = 0; i < 50; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+        mgr.forceSpill();
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading and writing distinct from file with three spills
+    @Test
+    public void testDistinctTripleSpill() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDistinctBag();
+        TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+        // Write tuples into both
+        for (int j = 0; j < 3; j++) {
+            for (int i = 0; i < 50; i++) {
+                Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+                b.add(t);
+                rightAnswer.add(t);
+            }
+            mgr.forceSpill();
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+    // Test reading with some in file, some in memory.
+    @Test
+    public void testDistinctInMemInFile() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDistinctBag();
+        TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+        // Write tuples into both
+        for (int i = 0; i < 50; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+        mgr.forceSpill();
+
+        for (int i = 0; i < 50; i++) {
+            Tuple t = new Tuple(new DataAtom(i));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading with a spill happening in the middle of the read.
+    @Test
+    public void testDistinctSpillDuringRead() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDistinctBag();
+        TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+        // Write tuples into both
+        for (int i = 0; i < 50; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+        mgr.forceSpill();
+
+        for (int i = 0; i < 50; i++) {
+            Tuple t = new Tuple(new DataAtom(i));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        for (int i = 0; i < 5; i++) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        mgr.forceSpill();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading and writing distinct from file with enough spills to
+   // force a pre-merge
+    @Test
+    public void testDistinctPreMerge() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDistinctBag();
+        TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+        // Write tuples into both
+        for (int j = 0; j < 321; j++) {
+            for (int i = 0; i < 50; i++) {
+                Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+                b.add(t);
+                rightAnswer.add(t);
+            }
+            mgr.forceSpill();
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+    // Test the default bag factory.
+    @Test
+    public void testDefaultBagFactory() throws Exception {
+        BagFactory f = BagFactory.getInstance();
+
+        DataBag bag = f.newDefaultBag();
+        DataBag sorted = f.newSortedBag(null);
+        DataBag distinct = f.newDistinctBag();
+
+        assertTrue("Expected a default bag", (bag instanceof DefaultDataBag));
+        assertTrue("Expected a sorted bag", (sorted instanceof SortedDataBag));
+        assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));
+    }
+
+    @Test
+    public void testProvidedBagFactory() throws Exception {
+        // Test bogus factory name.
+        BagFactory.resetSelf();
+        System.setProperty("pig.data.bag.factory.name", "no such class");
+        System.setProperty("pig.data.bag.factory.jar", "file:./pig.jar");
+        boolean caughtIt = false;
+        try {
+            BagFactory f = BagFactory.getInstance();
+        } catch (RuntimeException re) {
+            assertEquals("Expected Unable to instantiate message",
+                "Unable to instantiate bag factory no such class",
+                re.getMessage());
+            caughtIt = true;
+        }
+        assertTrue("Expected to catch exception", caughtIt);
+
+        // Test factory that isn't a BagFactory
+        BagFactory.resetSelf();
+        System.setProperty("pig.data.bag.factory.name",
+            "org.apache.pig.test.TestDataBag");
+        System.setProperty("pig.data.bag.factory.jar",
+            "file:./pig.jar");
+        caughtIt = false;
+        try {
+            BagFactory f = BagFactory.getInstance();
+        } catch (RuntimeException re) {
+            assertEquals("Expected does not extend BagFactory message", 
+                "Provided factory org.apache.pig.test.TestDataBag does not extend BagFactory!",
+                re.getMessage());
+            caughtIt = true;
+        }
+        assertTrue("Expected to catch exception", caughtIt);
+
+        // Test that we can instantiate our test factory.
+        BagFactory.resetSelf();
+        System.setProperty("pig.data.bag.factory.name",
+            "org.apache.pig.test.NonDefaultBagFactory");
+        System.setProperty("pig.data.bag.factory.jar", "file:./pig.jar");
+        BagFactory f = BagFactory.getInstance();
+        DataBag b = f.newDefaultBag();
+        b = f.newSortedBag(null);
+        b = f.newDistinctBag();
 
-public void testWriteReadNone() throws Exception
-{
-	DataBag before = new DataBag(Datum.DataType.CHARARRAY);
-
-	String s = new String("zzz");
-	before.add(new DataCharArrayNone(s.getBytes()));
-	s = new String("yyy");
-	before.add(new DataCharArrayNone(s.getBytes()));
-	s = new String("xxx");
-	before.add(new DataCharArrayNone(s.getBytes()));
-
-	File file = null;
-	file = File.createTempFile("DataBagCharArrayNone", "put");
-	FileOutputStream fos = new FileOutputStream(file);
-	DataOutput out = new DataOutputStream(fos);
-	before.write(out);
-	fos.close();
-
-	FileInputStream fis = new FileInputStream(file);
-	DataInput in = new DataInputStream(fis);
-	Datum a = DatumImpl.readDatum(in);
-
-	assertTrue("isa DataBag", a instanceof DataBag);
-
-	DataBag after = (DataBag)a;
-
-	assertTrue("bag of chararray", after.bagOf() == Datum.DataType.CHARARRAY);
-	assertEquals("after read, size", 3, after.size()); 
-
-	Iterator<Datum> j = after.content();
-
-	Datum valAfter = j.next();
-	assertTrue("should be a chararray", valAfter.getType() == Datum.DataType.CHARARRAY);
-	assertTrue("encoding should be none",
-		((DataCharArray)valAfter).getEncoding() == DataCharArray.Encoding.NONE);
-	for (int i = 0; i < 3; i++) {
-		assertEquals("value of valAfter", (byte)0x7a,
-			((DataCharArrayNone)valAfter).get()[i]);
-	}
-
-	valAfter = j.next();
-	assertTrue("should be a chararray", valAfter.getType() == Datum.DataType.CHARARRAY);
-	assertTrue("encoding should be none",
-		((DataCharArray)valAfter).getEncoding() == DataCharArray.Encoding.NONE);
-	for (int i = 0; i < 3; i++) {
-		assertEquals("value of valAfter", (byte)0x79,
-			((DataCharArrayNone)valAfter).get()[i]);
-	}
-
-	valAfter = j.next();
-	assertTrue("should be a chararray", valAfter.getType() == Datum.DataType.CHARARRAY);
-	assertTrue("encoding should be none",
-		((DataCharArray)valAfter).getEncoding() == DataCharArray.Encoding.NONE);
-	for (int i = 0; i < 3; i++) {
-		assertEquals("value of valAfter", (byte)0x78,
-			((DataCharArrayNone)valAfter).get()[i]);
-	}
-
-	assertFalse("should have read all values in bag", j.hasNext());
-	
-	file.delete();
+        BagFactory.resetSelf();
+    }
 }
 
-public void testWriteReadMap() throws Exception
-{
-	DataBag before = new DataBag(Datum.DataType.MAP);
-
-	DataMap map = new DataMap();
-
-	DataInteger key = new DataInteger(1);
-	Datum val = new DataInteger(99);
-	map.put(key, val);
-
-	before.add(map);
-
-	File file = null;
-	file = File.createTempFile("DataBagCharArrayNone", "put");
-	FileOutputStream fos = new FileOutputStream(file);
-	DataOutput out = new DataOutputStream(fos);
-	before.write(out);
-	fos.close();
-
-	FileInputStream fis = new FileInputStream(file);
-	DataInput in = new DataInputStream(fis);
-	Datum a = DatumImpl.readDatum(in);
-
-	assertTrue("isa DataBag", a instanceof DataBag);
-
-	DataBag after = (DataBag)a;
-
-	assertTrue("bag of maps", after.bagOf() == Datum.DataType.MAP);
-	assertEquals("after read, size", 1, after.size()); 
-
-	Iterator<Datum> j = after.content();
-
-	Datum v = j.next();
-	assertTrue("valAfter should be a map", v.getType() == Datum.DataType.MAP);
-	DataMap valAfter = (DataMap)v;
-
-	assertEquals("valAfter size", 1L, valAfter.size());
-
-	DataInteger nosuch = new DataInteger(-1);
-	Datum d = valAfter.get(nosuch);
-	assertTrue("after read, no such key", d.isNull());
-
-	Datum mapValAfter = valAfter.get(key);
-	assertTrue("mapValAfter isa integer", mapValAfter instanceof DataInteger);
-	assertEquals("value of valAfter", 99, ((DataInteger)mapValAfter).get());
-
-	assertFalse("should have read all values in bag", j.hasNext());
-	
-	file.delete();
-}
-
-public void testWriteReadTuple() throws Exception
-{
-	DataBag before = new DataBag(Datum.DataType.TUPLE);
-
-	Tuple t = new Tuple(1);
-	t.setField(0, new DataInteger(1));
-	before.add(t);
-
-	File file = null;
-	file = File.createTempFile("DataBagCharArrayNone", "put");
-	FileOutputStream fos = new FileOutputStream(file);
-	DataOutput out = new DataOutputStream(fos);
-	before.write(out);
-	fos.close();
-
-	FileInputStream fis = new FileInputStream(file);
-	DataInput in = new DataInputStream(fis);
-	Datum a = DatumImpl.readDatum(in);
-
-	assertTrue("isa DataBag", a instanceof DataBag);
-
-	DataBag after = (DataBag)a;
-
-	assertTrue("bag of tuples", after.bagOf() == Datum.DataType.TUPLE);
-	assertEquals("after read, size", 1, after.size()); 
-
-	Iterator<Datum> j = after.content();
-
-	Datum v = j.next();
-	assertTrue("valAfter should be a tuple",
-		v.getType() == Datum.DataType.TUPLE);
-
-	Tuple valAfter = (Tuple)v;
-
-	assertEquals("valAfter size", 1L, valAfter.size());
-
-	Datum tupleValAfter = valAfter.getField(0);
-	assertTrue("tupleValAfter isa integer", tupleValAfter instanceof DataInteger);
-	assertEquals("value of valAfter", 1, ((DataInteger)tupleValAfter).get());
-
-	assertFalse("should have read all values in bag", j.hasNext());
-	
-	file.delete();
-}
-
-public void testWriteReadBag() throws Exception
-{
-	DataBag before = new DataBag(Datum.DataType.BAG);
-
-	DataBag b = new DataBag(Datum.DataType.INT);
-	b.add(new DataInteger(2));
-	before.add(b);
-
-	File file = null;
-	file = File.createTempFile("DataBagCharArrayNone", "put");
-	FileOutputStream fos = new FileOutputStream(file);
-	DataOutput out = new DataOutputStream(fos);
-	before.write(out);
-	fos.close();
-
-	FileInputStream fis = new FileInputStream(file);
-	DataInput in = new DataInputStream(fis);
-	Datum a = DatumImpl.readDatum(in);
-
-	assertTrue("isa DataBag", a instanceof DataBag);
-
-	DataBag after = (DataBag)a;
-
-	assertTrue("bag of bags", after.bagOf() == Datum.DataType.BAG);
-	assertEquals("after read, size", 1, after.size()); 
-
-	Iterator<Datum> j = after.content();
-
-	Datum v = j.next();
-	assertTrue("valAfter should be a bag", v.getType() == Datum.DataType.BAG);
-	DataBag valAfter = (DataBag)v;
-
-	assertEquals("valAfter size", 1L, valAfter.size());
-
-	Iterator<Datum> k = valAfter.content();
-	Datum w = k.next();
-	assertTrue("bagValAfter should be an integer",
-		w.getType() == Datum.DataType.INT);
-	DataInteger bagValAfter = (DataInteger)w;
-
-	assertEquals("value of valAfter", 2, bagValAfter.get());
-
-	assertFalse("should have read all values in inner bag", k.hasNext());
-	assertFalse("should have read all values in bag", j.hasNext());
-	
-	file.delete();
-}
-
-}
 
 
-