You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/01/04 23:58:23 UTC

svn commit: r609048 [2/2] - in /incubator/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/builtin/ src/org/apache/pig/data/ src/org/apache/pig/impl/ src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/eval/ src/org/apache/pig/impl/eval/collec...

Modified: incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java Fri Jan  4 14:58:20 2008
@@ -56,6 +56,7 @@
 import org.apache.pig.impl.mapreduceExec.MapReduceLauncher;
 import org.apache.pig.impl.mapreduceExec.PigMapReduce;
 import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.PigLogger;
 import org.apache.pig.shock.SSHSocketImplFactory;
 
 
@@ -90,7 +91,6 @@
     //  connection to hadoop jobtracker (stays as null if doing local execution)
     transient private JobSubmissionProtocol jobTracker;                  
     transient private JobClient jobClient;                  
-	transient private Logger                mLogger;
    
     private String jobName = JOB_NAME_PREFIX;	// can be overwritten by users
   
@@ -110,8 +110,6 @@
 	public PigContext(ExecType execType){
 		this.execType = execType;
 		
-		mLogger = Logger.getLogger("org.apache.pig");
-
     	initProperties();
     	
         String pigJar = JarManager.findContainingJar(Main.class);
@@ -131,6 +129,8 @@
     }
 
 	private void initProperties() {
+        Logger log = PigLogger.getLogger();
+
 	    Properties fileProperties = new Properties();
 	        
 	    try{        
@@ -155,15 +155,16 @@
 	    //Now set these as system properties only if they are not already defined.
 	    for (Object o: fileProperties.keySet()){
 	    	String propertyName = (String)o;
-			mLogger.debug("Found system property " + propertyName + " in .pigrc"); 
+			log.debug("Found system property " + propertyName + " in .pigrc"); 
 	    	if (System.getProperty(propertyName) == null){
 	    		System.setProperty(propertyName, fileProperties.getProperty(propertyName));
-				mLogger.debug("Setting system property " + propertyName);
+				log.debug("Setting system property " + propertyName);
 	    	}
 	    }
 	}    
 	
     public void connect(){
+        Logger log = PigLogger.getLogger();
     	try{
 		if (execType != ExecType.LOCAL){
 		    	//First set the ssh socket factory
@@ -199,10 +200,10 @@
 		     
 	            lfs = FileSystem.getNamed("local", conf);
 	       
-	            mLogger.info("Connecting to hadoop file system at: " + conf.get("fs.default.name"));
+	            log.info("Connecting to hadoop file system at: " + conf.get("fs.default.name"));
 	            dfs = FileSystem.get(conf);
 	        
-	            mLogger.info("Connecting to map-reduce job tracker at: " + conf.get("mapred.job.tracker"));
+	            log.info("Connecting to map-reduce job tracker at: " + conf.get("mapred.job.tracker"));
 	            jobTracker = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
 	            				JobSubmissionProtocol.versionID, JobTracker.getAddress(conf), conf);
 		    jobClient = new JobClient(conf);
@@ -233,6 +234,7 @@
     };
     
     private String[] doHod(String server) {
+        Logger log = PigLogger.getLogger();
     	if (hodMapRed != null) {
     		return new String[] {hodHDFS, hodMapRed};
     	}
@@ -250,7 +252,10 @@
             cmd.append(System.getProperty("hod.command"));
             //String cmd = System.getProperty("hod.command", "/home/breed/startHOD.expect");
 			String cluster = System.getProperty("yinst.cluster");
-			if (cluster.length() > 0 && !cluster.startsWith("kryptonite")) {
+            // TODO This is a Yahoo specific holdover, need to remove
+            // this.
+			if (cluster != null && cluster.length() > 0 &&
+                    !cluster.startsWith("kryptonite")) {
 				cmd.append(" --config=");
 				cmd.append(System.getProperty("hod.config.dir"));
 				cmd.append('/');
@@ -264,8 +269,8 @@
             	p = fac.ssh(cmd.toString());
             }
             InputStream is = p.getInputStream();
-            mLogger.info("Connecting to HOD...");
-			mLogger.debug("sending HOD command " + cmd.toString());
+            log.info("Connecting to HOD...");
+			log.debug("sending HOD command " + cmd.toString());
             StringBuffer sb = new StringBuffer();
             int c;
             String hdfsUI = null;
@@ -279,23 +284,23 @@
                 	switch(current) {
                 	case HDFSUI:
                 		hdfsUI = sb.toString().trim();
-                		mLogger.info("HDFS Web UI: " + hdfsUI);
+                		log.info("HDFS Web UI: " + hdfsUI);
                 		break;
                 	case HDFS:
                 		hdfs = sb.toString().trim();
-                		mLogger.info("HDFS: " + hdfs);
+                		log.info("HDFS: " + hdfs);
                 		break;
                 	case MAPREDUI:
                 		mapredUI = sb.toString().trim();
-                		mLogger.info("JobTracker Web UI: " + mapredUI);
+                		log.info("JobTracker Web UI: " + mapredUI);
                 		break;
                 	case MAPRED:
                 		mapred = sb.toString().trim();
-                		mLogger.info("JobTracker: " + mapred);
+                		log.info("JobTracker: " + mapred);
                 		break;
 			case HADOOPCONF:
 				hadoopConf = sb.toString().trim();
-                		mLogger.info("HadoopConf: " + hadoopConf);
+                		log.info("HadoopConf: " + hadoopConf);
                 		break;
                 	}
                 	current = ParsingState.NOTHING;
@@ -341,7 +346,7 @@
 		throw new IOException("Missing Hadoop configuration file");
             return new String[] {hdfs, mapred};
         } catch (Exception e) {
-            mLogger.fatal("Could not connect to HOD", e);
+            log.fatal("Could not connect to HOD", e);
             System.exit(4);
         }
         throw new RuntimeException("Could not scrape needed information.");
@@ -415,10 +420,6 @@
     public JobConf getConf() {
         return conf;
     }
-
-	public Logger getLogger() {
-		return mLogger;
-	}
 
     public void setJobtrackerLocation(String newLocation) {
         conf.set("mapred.job.tracker", newLocation);

Modified: incubator/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java Fri Jan  4 14:58:20 2008
@@ -37,12 +37,12 @@
 		int numQuantiles = input.getAtomField(0).numval().intValue();
 		DataBag samples = input.getBagField(1);
 		
-		int numSamples = samples.cardinality();
+		long numSamples = samples.size();
 		
-		int toSkip = numSamples / numQuantiles;
+		long toSkip = numSamples / numQuantiles;
 		
-		int i=0, nextQuantile = 0;
-		Iterator<Tuple> iter = samples.content();
+		long i=0, nextQuantile = 0;
+		Iterator<Tuple> iter = samples.iterator();
 		while (iter.hasNext()){
 			Tuple t = iter.next();
 			if (i==nextQuantile){

Modified: incubator/pig/trunk/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java Fri Jan  4 14:58:20 2008
@@ -22,6 +22,7 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Iterator;
 
 import org.apache.pig.EvalFunc;
 import org.apache.pig.data.DataBag;
@@ -47,9 +48,16 @@
 		this.cmd = cmd;
 	}
 
-	private class EndOfQueue extends DataBag{
-		public void add(Datum d){}
-	}
+    private class EndOfQueue extends DataBag {
+        @Override
+        public void add(Tuple t){}
+
+        // To satisfy abstract functions in DataBag.
+        public boolean isSorted() { return false; }
+        public boolean isDistinct() { return false; }
+        public Iterator<Tuple> iterator() { return null; }
+        public long spill() { return 0; }
+    }
 	
 	private void startProcess() throws IOException {
 		Process p = Runtime.getRuntime().exec(cmd);

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java Fri Jan  4 14:58:20 2008
@@ -21,6 +21,7 @@
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Iterator;
 
 import org.apache.pig.EvalFunc;
 import org.apache.pig.Algebraic;
@@ -158,6 +159,13 @@
 		public FakeDataBag(DataCollector successor){
 			this.successor = successor;
 		}
+
+        // To satisfy abstract functions in DataBag.
+        public boolean isSorted() { return false; }
+        public boolean isDistinct() { return false; }
+        public Iterator<Tuple> iterator() { return null; }
+        public long spill() { return 0; }
+
 		
 		void addStart(){
 			successor.add(DataBag.startBag);

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java Fri Jan  4 14:58:20 2008
@@ -110,11 +110,7 @@
     	DataBag bag;
     	public DatumBag(){
     		super(null);
-    		try{
-    			bag = BagFactory.getInstance().getNewBag();
-    		}catch(IOException e){
-    			throw new RuntimeException(e);
-    		}
+    		bag = BagFactory.getInstance().newDefaultBag();
     	}
     	
     	@Override
@@ -126,7 +122,7 @@
     		return new Iterator<Datum>(){
     			Iterator<Tuple> iter;
     			{
-    				iter = bag.content();
+    				iter = bag.iterator();
     			}
     			public boolean hasNext() {
     				return iter.hasNext();

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java Fri Jan  4 14:58:20 2008
@@ -74,16 +74,11 @@
 				}else{
 					if (checkDelimiter(d)){
 						//Bag must have started now
-						try{
-							bag = BagFactory.getInstance().getNewBag();
-							if (eliminateDuplicates)
-								bag.distinct();
-							else
-								bag.sort(sortSpec);
-							
-						}catch(IOException e){
-							throw new RuntimeException(e);
-						}
+						if (eliminateDuplicates) {
+							bag = BagFactory.getInstance().newDistinctBag();
+						} else {
+							bag = BagFactory.getInstance().newSortedBag(sortSpec);
+						}
 					}else{
 						addToSuccessor(d);
 					}

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java Fri Jan  4 14:58:20 2008
@@ -84,7 +84,7 @@
 			DataBag bag = (DataBag)d;
 			//flatten the bag and send it through the pipeline
 			successor.add(DataBag.startBag);
-		    Iterator<Tuple> iter = bag.content();
+		    Iterator<Tuple> iter = bag.iterator();
 	    	while(iter.hasNext())
 	    		successor.add(iter.next());
 	    	successor.add(DataBag.endBag);

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java Fri Jan  4 14:58:20 2008
@@ -47,11 +47,7 @@
 		}else{
 			if (checkDelimiter(d)){
 				//Bag must have started now
-				try{
-					bag = BagFactory.getInstance().getNewBag();
-				}catch(IOException e){
-					throw new RuntimeException(e);
-				}
+				bag = BagFactory.getInstance().newDefaultBag();
 			}else{
 				successor.add(d);
 			}

Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java Fri Jan  4 14:58:20 2008
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+/*
 package org.apache.pig.impl.io;
 
 import java.io.BufferedInputStream;
@@ -97,3 +98,4 @@
 		store.delete();
 	}
 }
+*/

Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java Fri Jan  4 14:58:20 2008
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+ /*
 package org.apache.pig.impl.io;
 
 import java.io.BufferedOutputStream;
@@ -66,3 +67,4 @@
 	}
 	
 }
+*/

Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/PigFile.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/PigFile.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/PigFile.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/PigFile.java Fri Jan  4 14:58:20 2008
@@ -44,7 +44,7 @@
     }
     
     public DataBag load(LoadFunc lfunc, PigContext pigContext) throws IOException {
-        DataBag content = BagFactory.getInstance().getNewBag();
+        DataBag content = BagFactory.getInstance().newDefaultBag();
         InputStream is = FileLocalizer.open(file, pigContext);
         lfunc.bindTo(file, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
         Tuple f = null;
@@ -58,7 +58,7 @@
     public void store(DataBag data, StoreFunc sfunc, PigContext pigContext) throws IOException {
         BufferedOutputStream bos = new BufferedOutputStream(FileLocalizer.create(file, append, pigContext));
         sfunc.bindTo(bos);
-        for (Iterator<Tuple> it = data.content(); it.hasNext();) {
+        for (Iterator<Tuple> it = data.iterator(); it.hasNext();) {
             Tuple row = it.next();
             sfunc.putNext(row);
         }

Modified: incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java Fri Jan  4 14:58:20 2008
@@ -27,12 +27,14 @@
 import org.apache.log4j.Logger;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.eval.EvalSpec;
 import org.apache.pig.impl.io.PigFile;
 import org.apache.pig.impl.physicalLayer.POMapreduce;
 import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.PigLogger;
 import org.apache.pig.impl.util.ObjectSerializer;
 
 import org.apache.hadoop.fs.Path;
@@ -99,7 +101,7 @@
      */
     public boolean launchPig(POMapreduce pom) throws IOException {
 	      
-	Logger log = pom.pigContext.getLogger();
+        Logger log = PigLogger.getLogger();
         JobConf conf = new JobConf(pom.pigContext.getConf());
         conf.setJobName(pom.pigContext.getJobName());
         boolean success = false;
@@ -120,7 +122,7 @@
 	{
 		FileOutputStream fos = new FileOutputStream(submitJarFile);
                	JarManager.createJar(fos, funcs, pom.pigContext);
-               	System.out.println("Job jar size = " + submitJarFile.length());
+               	log.debug("Job jar size = " + submitJarFile.length());
 		conf.setJar(submitJarFile.getPath());
         	String user = System.getProperty("user.name");
         	conf.setUser(user != null ? user : "Pigster");
@@ -228,7 +230,8 @@
             	
             		// create an empty output file
                 	PigFile f = new PigFile(outputFile.toString(), false);
-                	f.store(new DataBag(), new PigStorage(), pom.pigContext);
+                	f.store(BagFactory.getInstance().newDefaultBag(),
+                        new PigStorage(), pom.pigContext);
                 
             	}
 

Modified: incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigCombine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigCombine.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigCombine.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigCombine.java Fri Jan  4 14:58:20 2008
@@ -28,7 +28,7 @@
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.BigDataBag;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Datum;
 import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.Tuple;
@@ -47,17 +47,19 @@
     private OutputCollector oc;
     private int             index;
     private int             inputCount;
-    private BigDataBag      bags[];
-    private File            tmpdir;
+    private DataBag         bags[];
+    // private File            tmpdir;
 
     public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
             throws IOException {
 
         try {
+            /*
             tmpdir = new File(job.get("mapred.task.id"));
             tmpdir.mkdirs();
 
             BagFactory.init(tmpdir);
+            */
             PigContext pigContext = (PigContext) ObjectSerializer.deserialize(job.get("pig.pigContext"));
             if (evalPipe == null) {
                 inputCount = ((ArrayList<FileSpec>)ObjectSerializer.deserialize(job.get("pig.inputs"))).size();
@@ -69,9 +71,9 @@
                 evalPipe = esp.setupPipe(finalout);
                 //throw new RuntimeException("combine spec: " + evalSpec + " combine pipe: " + esp.toString());
                 
-                bags = new BigDataBag[inputCount];
+                bags = new DataBag[inputCount];
                 for (int i = 0; i < inputCount; i++) {
-                    bags[i] = BagFactory.getInstance().getNewBigBag();
+                    bags[i] = BagFactory.getInstance().newDefaultBag();
                 }
             }
 
@@ -94,7 +96,7 @@
                 t.getBagField(it.index + 1).add(it.toTuple());
             }
             for (int i = 0; i < inputCount; i++) {  // XXX: shouldn't we only do this if INNER flag is set?
-                if (t.getBagField(1 + i).isEmpty())
+                if (t.getBagField(1 + i).size() == 0)
                     return;
             }
 //          throw new RuntimeException("combine input: " + t.toString());

Modified: incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java Fri Jan  4 14:58:20 2008
@@ -87,7 +87,7 @@
     private int                       index;
     private int                       inputCount;
     private boolean                   isInner[];
-    private File                      tmpdir;
+    // private File                      tmpdir;
     private static PigContext pigContext = null;
     ArrayList<PigRecordWriter> sideFileWriters = new ArrayList<PigRecordWriter>();
 
@@ -100,9 +100,11 @@
         PigMapReduce.reporter = reporter;
 
         oc = output;
+        /*
         tmpdir = new File(job.get("mapred.task.id"));
         tmpdir.mkdirs();
         BagFactory.init(tmpdir);
+        */
 
         setupMapPipe(reporter);
 
@@ -125,10 +127,12 @@
         PigMapReduce.reporter = reporter;
         
         try {
+            /*
             tmpdir = new File(job.get("mapred.task.id"));
             tmpdir.mkdirs();
 
             BagFactory.init(tmpdir);
+            */
 
             oc = output;
             if (evalPipe == null) {
@@ -140,7 +144,7 @@
             Tuple t = new Tuple(1 + inputCount);
             t.setField(0, groupName);
             for (int i = 1; i < 1 + inputCount; i++) {
-                bags[i - 1] = BagFactory.getInstance().getNewBag();
+                bags[i - 1] = BagFactory.getInstance().newDefaultBag();
                 t.setField(i, bags[i - 1]);
             }
 
@@ -150,7 +154,7 @@
             }
             
             for (int i = 0; i < inputCount; i++) {
-                if (isInner[i] && t.getBagField(1 + i).isEmpty())
+                if (isInner[i] && t.getBagField(1 + i).size() == 0)
                     return;
             }
             

Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/IntermedResult.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/IntermedResult.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/IntermedResult.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/IntermedResult.java Fri Jan  4 14:58:20 2008
@@ -24,6 +24,7 @@
 import org.apache.pig.PigServer.ExecType;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
@@ -58,7 +59,7 @@
     
     public IntermedResult() {
         executed = true;
-        databag = new DataBag();
+        databag = BagFactory.getInstance().newDefaultBag();
     }
     
     public IntermedResult(DataBag bag) {

Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POCogroup.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POCogroup.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POCogroup.java Fri Jan  4 14:58:20 2008
@@ -121,7 +121,7 @@
 
             boolean done = true;
             for (int i = 0; i < inputs.length; i++) {
-                DataBag b = BagFactory.getInstance().getNewBag();
+                DataBag b = BagFactory.getInstance().newDefaultBag();
 
                 while (sortedInputs[i].size() > 0) {
                     Datum g = sortedInputs[i].get(0)[0];
@@ -139,7 +139,7 @@
                     }
                 }
 
-                if (specs.get(i).isInner() && b.isEmpty())
+                if (specs.get(i).isInner() && (b.size() == 0))
                     done = false; // this input uses "inner" semantics, and it has no tuples for
                                     // this group, so suppress the tuple we're currently building
 

Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java Fri Jan  4 14:58:20 2008
@@ -21,6 +21,8 @@
 import java.util.ArrayList;
 import java.util.Comparator;
 
+import org.apache.log4j.Logger;
+
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.pig.data.Tuple;
@@ -30,7 +32,7 @@
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.mapreduceExec.MapReduceLauncher;
 import org.apache.pig.impl.util.ObjectSerializer;
-
+import org.apache.pig.impl.util.PigLogger;
 
 public class POMapreduce extends PhysicalOperator {
 	private static final long serialVersionUID = 1L;
@@ -162,16 +164,16 @@
     }
 
     void print() {
-        System.out.println("\n----- MapReduce Job -----");
-        System.out.println("Input: " + inputFileSpecs);
-        System.out.println("Map: " + toMap);
-        System.out.println("Group: " + groupFuncs);
-        System.out.println("Combine: " + toCombine);
-        System.out.println("Reduce: " + toReduce);
-        System.out.println("Output: " + outputFileSpec);
-        System.out.println("Split: " + toSplit);
-        System.out.println("Map parallelism: " + mapParallelism);
-        System.out.println("Reduce parallelism: " + reduceParallelism);
+        Logger log = PigLogger.getLogger();
+        log.debug("Input: " + inputFileSpecs);
+        log.debug("Map: " + toMap);
+        log.debug("Group: " + groupFuncs);
+        log.debug("Combine: " + toCombine);
+        log.debug("Reduce: " + toReduce);
+        log.debug("Output: " + outputFileSpec);
+        log.debug("Split: " + toSplit);
+        log.debug("Map parallelism: " + mapParallelism);
+        log.debug("Reduce parallelism: " + reduceParallelism);
     }
     
     public POMapreduce copy(){

Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PORead.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PORead.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PORead.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PORead.java Fri Jan  4 14:58:20 2008
@@ -46,7 +46,7 @@
     	if (continueFromLast){
     		throw new RuntimeException("LOReads should not occur in continuous plans");
     	}
-        it = bag.content();
+        it = bag.iterator();
 
         return true;
     }

Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POSort.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POSort.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POSort.java Fri Jan  4 14:58:20 2008
@@ -43,14 +43,13 @@
 	public boolean open(boolean continueFromLast) throws IOException {
 		if (!super.open(continueFromLast))
 			return false;
-		DataBag bag = BagFactory.getInstance().getNewBag();
+		DataBag bag = BagFactory.getInstance().newSortedBag(sortSpec);
 		
-		bag.sort(sortSpec);
 		Tuple t;
 		while((t = inputs[0].getNext())!=null){
 			bag.add(t);
 		}
-		iter = bag.content();
+		iter = bag.iterator();
 		return true;
 	}
 	

Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POStore.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POStore.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POStore.java Fri Jan  4 14:58:20 2008
@@ -21,6 +21,7 @@
 
 import org.apache.pig.StoreFunc;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
@@ -62,7 +63,7 @@
     @Override
 	public Tuple getNext() throws IOException {
         // get all tuples from input, and store them.
-        DataBag b = new DataBag();
+        DataBag b = BagFactory.getInstance().newDefaultBag();
         Tuple t;
         while ((t = (Tuple) inputs[0].getNext()) != null) {
             b.add(t);

Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java Fri Jan  4 14:58:20 2008
@@ -21,6 +21,7 @@
 import java.util.Map;
 
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 
@@ -33,7 +34,7 @@
     }
 
     public DataBag exec(boolean continueFromLast) throws IOException {
-        DataBag results = new DataBag();
+        DataBag results = BagFactory.getInstance().newDefaultBag();
 
         root.open(continueFromLast);
         Tuple t;

Added: incubator/pig/trunk/src/org/apache/pig/impl/util/PigLogger.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/util/PigLogger.java?rev=609048&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/util/PigLogger.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/util/PigLogger.java Fri Jan  4 14:58:20 2008
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.util;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.Level;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.PatternLayout;
+
+public class PigLogger 
+{
+
+private static Logger mLogger = null;
+private static boolean mHaveSetAppenders = false;
+
+/**
+ * Get an instance of the underlying log4j logger.  This first makes sure
+ * the PigLogger is initialized and then returns the underlying logger.
+ */ 
+public static Logger getLogger()
+{
+	if (mLogger == null) {
+		mLogger = Logger.getLogger("org.apache.pig");
+	}
+	return mLogger;
+}
+
+/**
+ * Set up a log appender for the junit tests, this way they cn write out log
+ * messages.
+ */
+public static void setAppenderForJunit()
+{
+	if (!mHaveSetAppenders) {
+		Logger log = getLogger();
+		log.setLevel(Level.INFO);
+		ConsoleAppender screen = new ConsoleAppender(new PatternLayout());
+		screen.setThreshold(Level.INFO);
+		screen.setTarget(ConsoleAppender.SYSTEM_ERR);
+		log.addAppender(screen);
+		mHaveSetAppenders = true;
+	}
+}
+
+
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=609048&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java Fri Jan  4 14:58:20 2008
@@ -0,0 +1,145 @@
+package org.apache.pig.impl.util;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryNotificationInfo;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryType;
+import java.lang.ref.WeakReference;
+import java.util.LinkedList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.management.Notification;
+import javax.management.NotificationEmitter;
+import javax.management.NotificationListener;
+import javax.management.openmbean.CompositeData;
+
+import org.apache.log4j.Logger;
+
+/**
+ * This class Tracks the tenured pool and a list of Spillable objects. When memory gets low, this
+ * class will start requesting Spillable objects to free up memory.
+ * <p>
+ * Low memory is defined as more than 50% of the tenured pool being allocated. Spillable objects are
+ * tracked using WeakReferences so that the objects can be GCed even though this class has a reference
+ * to them. 
+ *
+ */
+public class SpillableMemoryManager implements NotificationListener {
+    List<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
+    
+    public SpillableMemoryManager() {
+        ((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this, null, null);
+        List<MemoryPoolMXBean> mpbeans = ManagementFactory.getMemoryPoolMXBeans();
+        MemoryPoolMXBean biggestHeap = null;
+        long biggestSize = 0;
+        for (MemoryPoolMXBean b: mpbeans) {
+            PigLogger.getLogger().debug("Found heap (" + b.getName() +
+                ") of type " + b.getType());
+            if (b.getType() == MemoryType.HEAP) {
+                /* Here we are making the leap of faith that the biggest
+                 * heap is the tenured heap
+                 */
+                long size = b.getUsage().getMax();
+                if (size > biggestSize) {
+                    biggestSize = size;
+                    biggestHeap = b;
+                }
+            }
+        }
+        if (biggestHeap == null) {
+            throw new RuntimeException("Couldn't find heap");
+        }
+        PigLogger.getLogger().debug("Selected heap to monitor (" +
+            biggestHeap.getName() + ")");
+        /* We set the threshold to be 50% of tenured since that is where
+         * the GC starts to dominate CPU time according to Sun doc */
+        biggestHeap.setCollectionUsageThreshold((long)(biggestSize*.5));    
+    }
+    
+    public void handleNotification(Notification n, Object o) {
+        CompositeData cd = (CompositeData) n.getUserData();
+        MemoryNotificationInfo info = MemoryNotificationInfo.from(cd);
+        PigLogger.getLogger().info("low memory handler called " + info.getUsage());
+        long toFree = info.getUsage().getUsed() - (long)(info.getUsage().getMax()*.5);
+        if (toFree < 0) {
+            PigLogger.getLogger().debug("low memory handler returning " + 
+                "because there is nothing to free");
+            return;
+        }
+        synchronized(spillables) {
+            // Walk the list first and remove nulls, otherwise the sort
+            // takes way too long.
+            Iterator<WeakReference<Spillable>> i;
+            for (i = spillables.iterator(); i.hasNext();) {
+                Spillable s = i.next().get();
+                if (s == null) {
+                    i.remove();
+                }
+            }
+            Collections.sort(spillables, new Comparator<WeakReference<Spillable>>() {
+
+                /**
+                 * We don't lock anything, so this sort may not be stable if a WeakReference suddenly
+                 * becomes null, but it will be close enough.
+                 */    
+                @Override
+                public int compare(WeakReference<Spillable> o1Ref, WeakReference<Spillable> o2Ref) {
+                    Spillable o1 = o1Ref.get();
+                    Spillable o2 = o2Ref.get();
+                    if (o1 == null && o2 == null) {
+                        return 0;
+                    }
+                    if (o1 == null) {
+                        return -1;
+                    }
+                    if (o2 == null) {
+                        return 1;
+                    }
+                    long o1Size = o1.getMemorySize();
+                    long o2Size = o2.getMemorySize();
+                
+                    if (o1Size == o2Size) {
+                        return 0;
+                    }
+                    if (o1Size < o2Size) {
+                        return -1;
+                    }
+                    return 1;
+                }
+            });
+            long estimatedFreed = 0;
+            for (i = spillables.iterator(); i.hasNext();) {
+                Spillable s = i.next().get();
+                // Still need to check for null here, even after we removed
+                // above, because the reference may have gone bad on us
+                // since the last check.
+                if (s == null) {
+                    i.remove();
+                    continue;
+                }
+                long toBeFreed = s.getMemorySize();
+                s.spill();
+                estimatedFreed += toBeFreed;
+                if (estimatedFreed > toFree) {
+                    break;
+                }
+            }
+            /* Poke the GC again to see if we successfully freed enough memory */
+            System.gc();
+        }
+    }
+    
+    /**
+     * Register a spillable to be tracked. No need to unregister, the tracking will stop
+     * when the spillable is GCed.
+     * @param s the spillable to track.
+     */
+    public void registerSpillable(Spillable s) {
+        synchronized(spillables) {
+            spillables.add(new WeakReference<Spillable>(s));
+        }
+    }
+}

Modified: incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Fri Jan  4 14:58:20 2008
@@ -98,6 +98,10 @@
 	protected void processDescribe(String alias) throws IOException {
 		mPigServer.dumpSchema(alias);
 	}
+
+    protected void processExplain(String alias) throws IOException {
+        mPigServer.explain(alias, System.out);
+    }
 	
 	protected void processRegister(String jar) throws IOException {
 		mPigServer.registerJar(jar);
@@ -300,4 +304,4 @@
 	private JobClient mJobClient;
 	private boolean mDone;
 
-}
\ No newline at end of file
+}

Modified: incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original)
+++ incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Fri Jan  4 14:58:20 2008
@@ -47,6 +47,8 @@
 	abstract protected void processRegisterFunc(String name, String expr);
 	
 	abstract protected void processDescribe(String alias) throws IOException;
+
+    abstract protected void processExplain(String alias) throws IOException;
 	
 	abstract protected void processRegister(String jar) throws IOException;
 
@@ -113,6 +115,7 @@
 TOKEN: {<DEFINE: "define">}
 TOKEN: {<DUMP: "dump">}
 TOKEN: {<DESCRIBE: "describe">}
+TOKEN: {<EXPLAIN: "explain">}
 TOKEN: {<HELP: "help">}
 TOKEN: {<KILL: "kill">}
 TOKEN: {<LS: "ls">}
@@ -301,6 +304,10 @@
 	t1 = <IDENTIFIER>
 	{processDescribe(t1.image);}
 	|
+    <EXPLAIN>
+	t1 = <IDENTIFIER>
+	{processExplain(t1.image);}
+	|
 	<HELP>
 	{printHelp();}
 	|
@@ -446,6 +453,8 @@
 	t = <DUMP>
 	|
 	t = <DESCRIBE>
+	|
+	t = <EXPLAIN>
 	|
 	t = <HELP>
 	|

Modified: incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java Fri Jan  4 14:58:20 2008
@@ -91,7 +91,7 @@
         Tuple t3 = new Tuple(2);
         t3.setField(0, 82.0);
         t3.setField(1, 17);
-        DataBag bag = new DataBag();
+        DataBag bag = BagFactory.getInstance().newDefaultBag();
         bag.add(t1);
         bag.add(t2);
         bag.add(t3);
@@ -332,6 +332,7 @@
         assertTrue(f3.arity() == arity3);
     }
 
+    /*
     @Test
     public void testLFBin() throws Exception {
 
@@ -395,6 +396,7 @@
         assertTrue(r1.equals(t1));
         assertTrue(r2.equals(t5));
     }
+    */
 
     
     @Test
@@ -488,8 +490,8 @@
     	for (int i=0; i< numTimes; i++){
     		Tuple t = iter.next();
     		
-    		assertEquals(i+"AA", t.getBagField(0).content().next().getAtomField(0).strval());
-    		assertEquals(i+"BB", t.getBagField(1).content().next().getAtomField(0).strval());
+    		assertEquals(i+"AA", t.getBagField(0).iterator().next().getAtomField(0).strval());
+    		assertEquals(i+"BB", t.getBagField(1).iterator().next().getAtomField(0).strval());
     		
     	}
     	

Added: incubator/pig/trunk/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestDataBag.java?rev=609048&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestDataBag.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestDataBag.java Fri Jan  4 14:58:20 2008
@@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+/*
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Iterator;
+import java.util.Random;
+*/
+
+import java.util.*;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.pig.data.*;
+import org.apache.pig.impl.eval.*;
+import org.apache.pig.impl.util.Spillable;
+
+/**
+ * This class will exercise the basic Pig data model and members. It tests for proper behavior in
+ * assigment and comparision, as well as function application.
+ * 
+ * @author dnm
+ */
+public class TestDataBag extends junit.framework.TestCase {
+
+    private Random rand = new Random();
+
+    private class TestMemoryManager {
+        ArrayList<Spillable> mManagedObjects = new ArrayList<Spillable>();
+
+        public void register(Spillable s) {
+            mManagedObjects.add(s);
+        }
+
+        public void forceSpill() throws IOException {
+            Iterator<Spillable> i = mManagedObjects.iterator();
+            while (i.hasNext()) i.next().spill();
+        }
+    }
+
+    // Need to override the regular bag factory so I can register with my local
+    // memory manager.
+    private class LocalBagFactory {
+        TestMemoryManager mMemMgr;
+
+        public LocalBagFactory(TestMemoryManager mgr) {
+            mMemMgr = mgr;
+        }
+
+        public DataBag newDefaultBag() {
+            DataBag bag = new DefaultDataBag();
+            mMemMgr.register(bag);
+            return bag;
+        }
+
+        public DataBag newSortedBag(EvalSpec sortSpec) {
+            DataBag bag = new SortedDataBag(sortSpec);
+            mMemMgr.register(bag);
+            return bag;
+        }
+
+        public DataBag newDistinctBag() {
+            DataBag bag = new DistinctDataBag();
+            mMemMgr.register(bag);
+            return bag;
+        }
+    }
+
+    // Test reading and writing default from memory, no spills.
+    @Test
+    public void testDefaultInMemory() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDefaultBag();
+        ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(10);
+
+        // Write tuples into both
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(i));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading and writing default from file with one spill
+    @Test
+    public void testDefaultSingleSpill() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDefaultBag();
+        ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(10);
+
+        // Write tuples into both
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(i));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+        mgr.forceSpill();
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading and writing default from file with three spills
+    @Test
+    public void testDefaultTripleSpill() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDefaultBag();
+        ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(30);
+
+        // Write tuples into both
+        for (int j = 0; j < 3; j++) {
+            for (int i = 0; i < 10; i++) {
+                Tuple t = new Tuple(new DataAtom(i));
+                b.add(t);
+                rightAnswer.add(t);
+            }
+            mgr.forceSpill();
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+    // Test reading with some in file, some in memory.
+    @Test
+    public void testDefaultInMemInFile() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDefaultBag();
+        ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(20);
+
+        // Write tuples into both
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(i));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+        mgr.forceSpill();
+
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(i));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading with a spill happening in the middle of the read.
+    @Test
+    public void testDefaultSpillDuringRead() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDefaultBag();
+        ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(20);
+
+        // Write tuples into both
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(i));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+        mgr.forceSpill();
+
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(i));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        for (int i = 0; i < 15; i++) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        mgr.forceSpill();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+    // Test reading and writing sorted from memory, no spills.
+    @Test
+    public void testSortedInMemory() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newSortedBag(null);
+        PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(10);
+
+        // Write tuples into both
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+
+        Tuple t;
+        while ((t = rightAnswer.poll()) != null) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), t);
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading and writing default from file with one spill
+    @Test
+    public void testSortedSingleSpill() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newSortedBag(null);
+        PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(10);
+
+        // Write tuples into both
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+        mgr.forceSpill();
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Tuple t;
+        while ((t = rightAnswer.poll()) != null) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), t);
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading and writing default from file with three spills
+    @Test
+    public void testSortedTripleSpill() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newSortedBag(null);
+        PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(30);
+
+        // Write tuples into both
+        for (int j = 0; j < 3; j++) {
+            for (int i = 0; i < 10; i++) {
+                Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+                b.add(t);
+                rightAnswer.add(t);
+            }
+            mgr.forceSpill();
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+
+        Tuple t;
+        while ((t = rightAnswer.poll()) != null) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), t);
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+    // Test reading with some in file, some in memory.
+    @Test
+    public void testSortedInMemInFile() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newSortedBag(null);
+        PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(20);
+
+        // Write tuples into both
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+        mgr.forceSpill();
+
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Tuple t;
+        while ((t = rightAnswer.poll()) != null) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), t);
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+    // Test reading with a spill happening in the middle of the read.
+    @Test
+    public void testSortedSpillDuringRead() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newSortedBag(null);
+        PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(20);
+
+        // Write tuples into both
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+        mgr.forceSpill();
+
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+
+        for (int i = 0; i < 15; i++) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rightAnswer.poll());
+        }
+
+        mgr.forceSpill();
+
+        Tuple t;
+        while ((t = rightAnswer.poll()) != null) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), t);
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+    // Test reading with first spill happening in the middle of the read.
+    @Test
+    public void testSortedFirstSpillDuringRead() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newSortedBag(null);
+        PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(20);
+
+        for (int i = 0; i < 10; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+
+        for (int i = 0; i < 5; i++) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rightAnswer.poll());
+        }
+
+        mgr.forceSpill();
+
+        Tuple t;
+        while ((t = rightAnswer.poll()) != null) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), t);
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading and writing sorted file with so many spills it requires
+   // premerge.
+    @Test
+    public void testSortedPreMerge() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newSortedBag(null);
+        PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(30);
+
+        // Write tuples into both
+        for (int j = 0; j < 373; j++) {
+            for (int i = 0; i < 10; i++) {
+                Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+                b.add(t);
+                rightAnswer.add(t);
+            }
+            mgr.forceSpill();
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+
+        Tuple t;
+        while ((t = rightAnswer.poll()) != null) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), t);
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+    // Test reading and writing distinct from memory, no spills.
+    @Test
+    public void testDistinctInMemory() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDistinctBag();
+        TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+        // Write tuples into both
+        for (int i = 0; i < 50; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading and writing distinct from file with one spill
+    @Test
+    public void testDistinctSingleSpill() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDistinctBag();
+        TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+        // Write tuples into both
+        for (int i = 0; i < 50; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+        mgr.forceSpill();
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading and writing distinct from file with three spills
+    @Test
+    public void testDistinctTripleSpill() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDistinctBag();
+        TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+        // Write tuples into both
+        for (int j = 0; j < 3; j++) {
+            for (int i = 0; i < 50; i++) {
+                Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+                b.add(t);
+                rightAnswer.add(t);
+            }
+            mgr.forceSpill();
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+    // Test reading with some in file, some in memory.
+    @Test
+    public void testDistinctInMemInFile() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDistinctBag();
+        TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+        // Write tuples into both
+        for (int i = 0; i < 50; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+        mgr.forceSpill();
+
+        for (int i = 0; i < 50; i++) {
+            Tuple t = new Tuple(new DataAtom(i));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading with a spill happening in the middle of the read.
+    @Test
+    public void testDistinctSpillDuringRead() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDistinctBag();
+        TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+        // Write tuples into both
+        for (int i = 0; i < 50; i++) {
+            Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+        mgr.forceSpill();
+
+        for (int i = 0; i < 50; i++) {
+            Tuple t = new Tuple(new DataAtom(i));
+            b.add(t);
+            rightAnswer.add(t);
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        for (int i = 0; i < 5; i++) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        mgr.forceSpill();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+   // Test reading and writing distinct from file with enough spills to
+   // force a pre-merge
+    @Test
+    public void testDistinctPreMerge() throws Exception {
+        TestMemoryManager mgr = new TestMemoryManager();
+        LocalBagFactory factory = new LocalBagFactory(mgr);
+        DataBag b = factory.newDistinctBag();
+        TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+        // Write tuples into both
+        for (int j = 0; j < 321; j++) {
+            for (int i = 0; i < 50; i++) {
+                Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+                b.add(t);
+                rightAnswer.add(t);
+            }
+            mgr.forceSpill();
+        }
+
+        // Read tuples back, hopefully they come out in the same order.
+        Iterator<Tuple> bIter = b.iterator();
+        Iterator<Tuple> rIter = rightAnswer.iterator();
+
+        while (rIter.hasNext()) {
+            assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+            assertEquals("tuples should be the same", bIter.next(), rIter.next());
+        }
+
+        assertFalse("right answer ran out of tuples before the bag",
+            bIter.hasNext());
+    }
+
+}
+
+
+

Modified: incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java Fri Jan  4 14:58:20 2008
@@ -159,6 +159,7 @@
         assertTrue(n1.arity() == n1Arity + n2Arity);
     }
 
+    /*
     @Test
     public void testDataBag() throws Exception {
         int[] input1 = { 1, 2, 3, 4, 5 };
@@ -217,6 +218,7 @@
     	Runtime.getRuntime().gc();
     	testBigDataBag(Runtime.getRuntime().maxMemory() - 1*1024*1024, 1000000);
     }
+    */
 
     private enum TestType {
     	PRE_SORT,
@@ -227,6 +229,7 @@
     }
        
     
+    /*
     private void testBigDataBag(long freeMemoryToMaintain, int numItems) throws Exception {
     	BigDataBag.FREE_MEMORY_TO_MAINTAIN = freeMemoryToMaintain;
         Random r = new Random();
@@ -288,5 +291,6 @@
         if (testType != TestType.NONE)
         	assertTrue(bag.numNotifies >= count/DataBag.notifyInterval);
     }
+    */
 
 }

Modified: incubator/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java Fri Jan  4 14:58:20 2008
@@ -37,9 +37,7 @@
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.builtin.TextLoader;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataMap;
-import org.apache.pig.data.Tuple;
+import org.apache.pig.data.*;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.PigFile;
 
@@ -134,7 +132,7 @@
 	@Test
 	public void testMapLookup() throws IOException{
 		PigServer pigServer = new PigServer(initString);
-		DataBag b = new DataBag();
+		DataBag b = BagFactory.getInstance().newDefaultBag();
 		DataMap colors = new DataMap();
 		colors.put("apple","red");
 		colors.put("orange","orange");

Modified: incubator/pig/trunk/test/org/apache/pig/test/TestMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestMapReduce.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestMapReduce.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestMapReduce.java Fri Jan  4 14:58:20 2008
@@ -78,7 +78,7 @@
     	}
         @Override
 		public void exec(Tuple input, DataBag output) throws IOException {
-            Iterator<Tuple> it = (input.getBagField(0)).content();
+            Iterator<Tuple> it = (input.getBagField(0)).iterator();
             while(it.hasNext()) {
                 Tuple t = it.next();
                 Tuple newT = new Tuple(2);

Modified: incubator/pig/trunk/test/org/apache/pig/test/TestPigFile.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestPigFile.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestPigFile.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestPigFile.java Fri Jan  4 14:58:20 2008
@@ -33,18 +33,14 @@
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.data.DataAtom;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataMap;
-import org.apache.pig.data.Datum;
-import org.apache.pig.data.Tuple;
+import org.apache.pig.data.*;
 import org.apache.pig.PigServer.ExecType;
 import org.apache.pig.impl.io.PigFile;
 import org.apache.pig.impl.PigContext;
 
 public class TestPigFile extends TestCase {
 
-    DataBag bag          = new DataBag();
+    DataBag bag          = BagFactory.getInstance().newDefaultBag();
     Random rand = new Random();
     
     @Override
@@ -89,10 +85,10 @@
         DataBag loaded = load.load(new PigStorage(), pigContext);
         System.out.println("Done.");
 
-        assertTrue(bag.cardinality() == loaded.cardinality());
+        assertTrue(bag.size() == loaded.size());
 
-        Iterator<Tuple> it1 = bag.content();
-        Iterator<Tuple> it2 = loaded.content();
+        Iterator<Tuple> it1 = bag.iterator();
+        Iterator<Tuple> it2 = loaded.iterator();
         while (it1.hasNext() && it2.hasNext()) {
             Tuple f1 = it1.next();
             Tuple f2 = it2.next();
@@ -131,7 +127,7 @@
     
     private DataBag getRandomBag(int maxCardinality, int nestingLevel) throws IOException{
     	int cardinality = rand.nextInt(maxCardinality)+1;
-    	DataBag b = new DataBag();
+    	DataBag b = BagFactory.getInstance().newDefaultBag();
     	for (int i=0; i<cardinality; i++){
     		Tuple t = getRandomTuple(nestingLevel+1); 
     		b.add(t);
@@ -168,10 +164,10 @@
         DataBag loaded = load.load(new BinStorage(), pigContext);
         System.out.println("Done.");
 
-        assertTrue(bag.cardinality() == loaded.cardinality());
+        assertTrue(bag.size() == loaded.size());
 
-        Iterator<Tuple> it1 = bag.content();
-        Iterator<Tuple> it2 = loaded.content();
+        Iterator<Tuple> it1 = bag.iterator();
+        Iterator<Tuple> it2 = loaded.iterator();
         while (it1.hasNext() && it2.hasNext()) {
             Tuple f1 = it1.next();
             Tuple f2 = it2.next();

Modified: incubator/pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/Util.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/Util.java Fri Jan  4 14:58:20 2008
@@ -19,8 +19,7 @@
 
 import java.io.IOException;
 
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
+import org.apache.pig.data.*;
 
 public class Util {
     // Helper Functions
@@ -40,7 +39,7 @@
     }
 
     static public Tuple loadNestTuple(Tuple t, int[] input) throws IOException {
-        DataBag bag = new DataBag();
+        DataBag bag = BagFactory.getInstance().newDefaultBag();
         for(int i = 0; i < input.length; i++) {
             Tuple f = new Tuple(1);
             f.setField(0, input[i]);
@@ -52,7 +51,7 @@
 
     static public Tuple loadNestTuple(Tuple t, int[][] input) throws IOException {
         for (int i = 0; i < input.length; i++) {
-            DataBag bag = new DataBag();
+            DataBag bag = BagFactory.getInstance().newDefaultBag();
             Tuple f = loadFlatTuple(new Tuple(input[i].length), input[i]);
             bag.add(f);
             t.setField(i, bag);
@@ -62,7 +61,7 @@
 
     static public Tuple loadTuple(Tuple t, String[][] input) throws IOException {
         for (int i = 0; i < input.length; i++) {
-            DataBag bag = new DataBag();
+            DataBag bag = BagFactory.getInstance().newDefaultBag();
             Tuple f = loadTuple(new Tuple(input[i].length), input[i]);
             bag.add(f);
             t.setField(i, bag);