You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/10/28 17:26:59 UTC

svn commit: r830664 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/builtin/ src/org/apache/pig/data/ test/org/apache/pig/test/

Author: gates
Date: Wed Oct 28 16:26:58 2009
New Revision: 830664

URL: http://svn.apache.org/viewvc?rev=830664&view=rev
Log:
PIG-1037: Converted sorted and distinct bags to use the new active spilling paradigm.

Added:
    hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java
    hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java
    hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Oct 28 16:26:58 2009
@@ -26,6 +26,9 @@
 
 IMPROVEMENTS
 
+PIG-1037: Converted sorted and distinct bags to use the new active spilling
+          paradigm (yinghe via gates)
+
 PIG-1051: FINDBUGS: NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE (olgan)
 
 PIG-1050: FINDBUGS: DLS_DEAD_LOCAL_STORE: Dead store to local variable (olgan)

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java Wed Oct 28 16:26:58 2009
@@ -25,6 +25,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -32,6 +33,7 @@
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.NonSpillableDataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -120,13 +122,25 @@
         keyLookup = lrKeyInfo.second;
     }
 
+    private DataBag createDataBag() {
+    	String bagType = null;
+        if (PigMapReduce.sJobConf != null) {
+   			bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");       			
+   	    }
+                		          	           		
+    	if (bagType != null && bagType.equalsIgnoreCase("default")) {
+    		return new NonSpillableDataBag();
+    	}
+    	return new InternalCachedBag();  	
+    }
+    
     @Override
     public Result getNext(Tuple t) throws ExecException {
         int keyField = -1;
         //Create numInputs bags
         Object[] fields = new Object[mBags.length];
         for (int i = 0; i < mBags.length; i++) {
-            if (mBags[i]) fields[i] = new NonSpillableDataBag();
+            if (mBags[i]) fields[i] = createDataBag();            
         }
         
         // For each indexed tup in the inp, split them up and place their

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Wed Oct 28 16:26:58 2009
@@ -24,6 +24,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -31,6 +32,8 @@
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalDistinctBag;
+import org.apache.pig.data.InternalSortedBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -73,9 +76,21 @@
 
     @Override
     public Result getNext(Tuple t) throws ExecException {
-        if (!inputsAccumulated) {
-            Result in = processInput();
-            distinctBag = BagFactory.getInstance().newDistinctBag();
+         if (!inputsAccumulated) {
+            Result in = processInput();    
+            
+            // by default, we create InternalSortedBag, unless user configures
+			// explicitly to use old bag
+           	String bagType = null;
+            if (PigMapReduce.sJobConf != null) {
+       			bagType = PigMapReduce.sJobConf.get("pig.cachedbag.distinct.type");       			
+       	    }            
+            if (bagType != null && bagType.equalsIgnoreCase("default")) {        	    	
+            	distinctBag = BagFactory.getInstance().newDistinctBag();    			
+       	    } else {
+       	    	distinctBag = new InternalDistinctBag(3);
+    	    }
+            
             while (in.returnStatus != POStatus.STATUS_EOP) {
                 if (in.returnStatus == POStatus.STATUS_ERR) {
                     log.error("Error in reading from inputs");

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Wed Oct 28 16:26:58 2009
@@ -27,6 +27,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -38,6 +39,8 @@
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.InternalSortedBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -255,9 +258,21 @@
 	@Override
 	public Result getNext(Tuple t) throws ExecException {
 		Result res = new Result();
+		
 		if (!inputsAccumulated) {
-			res = processInput();
-            sortedBag = BagFactory.getInstance().newSortedBag(mComparator);
+			res = processInput();         
+			// by default, we create InternalSortedBag, unless user configures
+			// explicitly to use old bag
+			String bagType = null;
+	        if (PigMapReduce.sJobConf != null) {
+	   			bagType = PigMapReduce.sJobConf.get("pig.cachedbag.sort.type");       			
+	   	    }	        
+            if (bagType != null && bagType.equalsIgnoreCase("default")) {        	    	
+            	sortedBag = BagFactory.getInstance().newSortedBag(mComparator);     			
+       	    } else {
+    	    	sortedBag = new InternalSortedBag(3, mComparator);
+    	    }
+            
 			while (res.returnStatus != POStatus.STATUS_EOP) {
 				if (res.returnStatus == POStatus.STATUS_ERR) {
 					log.error("Error in reading from the inputs");

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java Wed Oct 28 16:26:58 2009
@@ -23,8 +23,10 @@
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.InternalDistinctBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.util.WrappedIOException;
@@ -114,8 +116,23 @@
         }
     }
     
+    static private DataBag createDataBag() {
+    	// by default, we create InternalSortedBag, unless user configures
+		// explicitly to use old bag
+    	String bagType = null;
+        if (PigMapReduce.sJobConf != null) {     
+   			bagType = PigMapReduce.sJobConf.get("pig.cachedbag.distinct.type");       			
+   	    }
+                      
+    	if (bagType != null && bagType.equalsIgnoreCase("default")) {        	    	
+        	return BagFactory.getInstance().newDistinctBag();    			
+   	    } else {   	    	
+   	    	return new InternalDistinctBag(3);
+	    }
+    }
+    
     static private DataBag getDistinctFromNestedBags(Tuple input, EvalFunc evalFunc) throws IOException {
-        DataBag result = bagFactory.newDistinctBag();
+        DataBag result = createDataBag();
         long progressCounter = 0;
         try {
             DataBag bg = (DataBag)input.get(0);
@@ -140,7 +157,7 @@
     protected DataBag getDistinct(Tuple input) throws IOException {
         try {
             DataBag inputBg = (DataBag)input.get(0);
-            DataBag result = bagFactory.newDistinctBag();
+            DataBag result = createDataBag();
             long progressCounter = 0;
             for (Tuple tuple : inputBg) {
                 result.add(tuple);

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java Wed Oct 28 16:26:58 2009
@@ -136,6 +136,16 @@
             used *= numInMem;
         }
 
+        // add up the overhead for this object, mContents object, references to tuples,
+        // and other object variables
+        used += 12 + 12 + numInMem*4 + 8 + 4 + 8;
+        
+        // add up overhead for mSpillFiles ArrayList, Object[] inside ArrayList,
+        // object variable inside ArrayList and references to spill files
+        if (mSpillFiles != null) {
+        	used += 12 + 12 + 4 + mSpillFiles.size()*4;
+        }
+        
         mMemSize = used;
         mMemSizeChanged = false;
         return used;
@@ -181,19 +191,20 @@
             // of it so I can guarantee order.
             DataBag thisClone;
             DataBag otherClone;
-            if (this instanceof SortedDataBag ||
-                    this instanceof DistinctDataBag) {
+            BagFactory factory = BagFactory.getInstance();
+            
+            if (this.isSorted() || this.isDistinct()) {
                 thisClone = this;
             } else {
-                thisClone = new SortedDataBag(null);
+                thisClone = factory.newSortedBag(null);
                 Iterator<Tuple> i = iterator();
                 while (i.hasNext()) thisClone.add(i.next());
+                
             }
-            if (other instanceof SortedDataBag ||
-                    other instanceof DistinctDataBag) {
+            if (((DataBag) other).isSorted() || ((DataBag)other).isDistinct()) {
                 otherClone = bOther;
             } else {
-                otherClone = new SortedDataBag(null);
+                otherClone = factory.newSortedBag(null);
                 Iterator<Tuple> i = bOther.iterator();
                 while (i.hasNext()) otherClone.add(i.next());
             }

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java Wed Oct 28 16:26:58 2009
@@ -183,7 +183,10 @@
      */
     public long getMemorySize() {
         Iterator<Object> i = mFields.iterator();
-        long sum = 0;
+        // initial memory overhead for Tuple object, ArrayList object
+        // and Object[] inside ArrayList, plus references to each tuple field,
+        // plus other object variables
+        long sum = 12*3 + mFields.size()*4 + 8;
         while (i.hasNext()) {
             sum += getFieldMemorySize(i.next());
         }
@@ -307,12 +310,12 @@
 
             case DataType.TUPLE: {
                 Tuple t = (Tuple)o;
-                return t.getMemorySize() + 12;
+                return t.getMemorySize();
             }
 
             case DataType.BAG: {
                 DataBag b = (DataBag)o;
-                return b.getMemorySize() + 12;
+                return b.getMemorySize();
             }
 
             case DataType.INTEGER:

Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java Wed Oct 28 16:26:58 2009
@@ -19,6 +19,7 @@
 
 import java.io.*;
 import java.util.*;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -41,7 +42,7 @@
     }
 
     public InternalCachedBag(int bagCount) {       
-        float percent = 0.5F;
+        float percent = 0.1F;
         
     	if (PigMapReduce.sJobConf != null) {
     		String usage = PigMapReduce.sJobConf.get("pig.cachedbag.memusage");
@@ -82,12 +83,14 @@
                 
         if(mContents.size() < cacheLimit)  {
             mMemSizeChanged = true;
-            mContents.add(t);
+            mContents.add(t);           
             if(mContents.size() < 100)
             {
                 memUsage += t.getMemorySize();
                 long avgUsage = memUsage / (long)mContents.size();
-                cacheLimit = (int)(maxMemUsage / avgUsage);
+                if (avgUsage > 0) {
+                	cacheLimit = (int)(maxMemUsage / avgUsage);
+                }
             }
         } else {
             try {
@@ -107,6 +110,20 @@
         mSize++;
     }
 
+    public void addAll(DataBag b) {
+    	Iterator<Tuple> iter = b.iterator();
+    	while(iter.hasNext()) {
+    		add(iter.next());
+    	}
+    }
+
+    public void addAll(Collection<Tuple> c) {
+    	Iterator<Tuple> iter = c.iterator();
+    	while(iter.hasNext()) {
+    		add(iter.next());
+    	}
+    }
+    
     private void addDone() {
         if(out != null) {
             try {

Added: hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java?rev=830664&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java Wed Oct 28 16:26:58 2009
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.ListIterator;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigWarning;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+
+
+
+/**
+ * An unordered collection of Tuples with no multiples.  Data is
+ * stored without duplicates as it comes in.  When it is time to spill,
+ * that data is sorted and written to disk.  The data is
+ * stored in a HashSet.  When it is time to sort it is placed in an
+ * ArrayList and then sorted.  Dispite all these machinations, this was
+ * found to be faster than storing it in a TreeSet.
+ * 
+ * This bag spills pro-actively when the number of tuples in memory 
+ * reaches a limit
+ */
+public class InternalDistinctBag extends DefaultAbstractBag {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 2L;
+
+    private static final Log log = LogFactory.getLog(InternalDistinctBag.class);
+
+    private static TupleFactory gTupleFactory = TupleFactory.getInstance();
+    
+    private transient boolean mReadStarted = false;
+    
+    private transient int cacheLimit;
+    private transient long maxMemUsage;
+    private transient long memUsage;    
+
+    public InternalDistinctBag() {
+        this(1, -1.0);
+    }
+    
+    public InternalDistinctBag(int bagCount) {        
+    	this(bagCount, -1.0);
+    }
+    
+    public InternalDistinctBag(int bagCount, double percent) {        
+        if (percent < 0) {
+        	percent = 0.1F;            
+        	if (PigMapReduce.sJobConf != null) {
+        		String usage = PigMapReduce.sJobConf.get("pig.cachedbag.memusage");
+        		if (usage != null) {
+        			percent = Float.parseFloat(usage);
+        		}
+        	}
+        }
+           	
+        init(bagCount, percent);        
+    }
+
+    private void init(int bagCount, double percent) {
+    	mContents = new HashSet<Tuple>();      
+    	 
+    	long max = Runtime.getRuntime().maxMemory();
+        maxMemUsage = (long)(((float)max * percent) / (float)bagCount);
+        cacheLimit = Integer.MAX_VALUE;
+        
+        // set limit to 0, if memusage is 0 or really really small.
+        // then all tuples are put into disk
+        if (maxMemUsage < 1) {
+        	cacheLimit = 0;
+        }        
+    }
+    
+    public boolean isSorted() {
+        return false;
+    }
+    
+    public boolean isDistinct() {
+        return true;
+    }
+    
+    
+    public long size() {
+        if (mSpillFiles != null && mSpillFiles.size() > 0){
+            //We need to racalculate size to guarantee a count of unique 
+            //entries including those on disk
+            Iterator<Tuple> iter = iterator();
+            int newSize = 0;
+            while (iter.hasNext()) {
+                newSize++;
+                iter.next();
+            }
+            
+            mSize = newSize;
+        }
+        return mSize;
+    }
+    
+    
+    public Iterator<Tuple> iterator() {
+        return new DistinctDataBagIterator();
+    }
+
+    @Override
+    public void add(Tuple t) {        
+        
+        if(mReadStarted) {
+            throw new IllegalStateException("InternalDistinctBag is closed for adding new tuples");
+        }
+                
+    	if (mContents.size() > cacheLimit) {    		
+    		spill();
+    	}
+    	            	
+        if (mContents.add(t)) {
+        	mMemSizeChanged = true;
+        	mSize ++;
+                
+        	 // check how many tuples memory can hold by getting average
+            // size of first 100 tuples
+            if(mSize < 100 && (mSpillFiles == null || mSpillFiles.isEmpty())) {
+                memUsage += t.getMemorySize();
+                long avgUsage = memUsage / (long)mContents.size();
+                if (avgUsage >0) {
+                	cacheLimit = (int)(maxMemUsage / avgUsage);
+                	log.debug("Memory can hold "+ cacheLimit + " records.");
+                }
+            }          
+        }    	
+    }
+
+    public void addAll(DataBag b) {
+    	Iterator<Tuple> iter = b.iterator();
+    	while(iter.hasNext()) {
+    		add(iter.next());
+    	}
+    }
+
+    public void addAll(Collection<Tuple> c) {
+    	Iterator<Tuple> iter = c.iterator();
+    	while(iter.hasNext()) {
+    		add(iter.next());
+    	}
+    }
+    
+    public long spill() {       	
+        // Make sure we have something to spill.  Don't create empty
+        // files, as that will make a mess.
+        if (mContents.size() == 0) return 0;
+
+        // Lock the container before I spill, so that iterators aren't
+        // trying to read while I'm mucking with the container.
+        long spilled = 0;
+       
+        DataOutputStream out = null;
+        try {
+            out = getSpillFile();
+        }  catch (IOException ioe) {
+            // Do not remove last file from spilled array. It was not
+            // added as File.createTmpFile threw an IOException
+            warn(
+                "Unable to create tmp file to spill to disk", PigWarning.UNABLE_TO_CREATE_FILE_TO_SPILL, ioe);
+            return 0;
+        }
+        try {            
+        	Tuple[] array = new Tuple[mContents.size()];
+            mContents.toArray(array);
+            Arrays.sort(array);
+            for (int i = 0; i < array.length; i++) {
+                array[i].write(out);
+                spilled++;
+                // This will spill every 16383 records.
+                if ((spilled & 0x3fff) == 0) reportProgress();
+            }
+            
+            out.flush();
+        } catch (IOException ioe) {
+            // Remove the last file from the spilled array, since we failed to
+            // write to it.
+            mSpillFiles.remove(mSpillFiles.size() - 1);
+            warn(
+                "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+            return 0;
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException e) {
+                    warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
+                }
+            }
+        }
+        mContents.clear();
+        mMemSizeChanged = true;
+        memUsage = 0;
+                
+        return spilled;
+    }
+
+    /**
+     * An iterator that handles getting the next tuple from the bag.
+     * Data can be stored in a combination of in memory and on disk.  
+     */
+    private class DistinctDataBagIterator implements Iterator<Tuple> {
+
+        private class TContainer implements Comparable<TContainer> {
+            public Tuple tuple;
+            public int fileNum;
+
+            @SuppressWarnings("unchecked")
+			public int compareTo(TContainer other) {
+                return tuple.compareTo(other.tuple);
+            }
+            
+            public boolean equals(Object obj) {
+            	if (obj instanceof TContainer) {
+            		return compareTo((TContainer)obj) == 0;
+            	}
+            	
+            	return false;
+            }
+            
+            public int hashCode() {
+            	return tuple.hashCode();
+            }
+        }
+
+        // We have to buffer a tuple because there's no easy way for next
+        // to tell whether or not there's another tuple available, other
+        // than to read it.
+        private Tuple mBuf = null;
+        private int mMemoryPtr = 0;
+        private TreeSet<TContainer> mMergeTree = null;
+        private ArrayList<DataInputStream> mStreams = null;
+        private int mCntr = 0;
+
+        @SuppressWarnings("unchecked")
+	DistinctDataBagIterator() {
+            // If this is the first read, we need to sort the data.            
+        	if (!mReadStarted) {
+                preMerge();
+                // We're the first reader, we need to sort the data.
+                // This is in case it gets dumped under us.
+                ArrayList<Tuple> l = new ArrayList<Tuple>(mContents);
+                Collections.sort(l);
+                mContents = l;
+                mReadStarted = true;
+            }            
+        }
+
+        public boolean hasNext() { 
+            // See if we can find a tuple.  If so, buffer it.
+            mBuf = next();
+            return mBuf != null;
+        }
+
+        public Tuple next() {
+            // This will report progress every 1024 times through next.
+            // This should be much faster than using mod.
+            if ((mCntr++ & 0x3ff) == 0) reportProgress();
+
+            // If there's one in the buffer, use that one.
+            if (mBuf != null) {
+                Tuple t = mBuf;
+                mBuf = null;
+                return t;
+            }
+
+            // Check to see if we just need to read from memory.                       
+            if (mSpillFiles == null || mSpillFiles.size() == 0) {
+                return readFromMemory();
+            }
+            
+            // We have spill files, so we need to read the next tuple from
+            // one of those files or from memory.
+            return readFromTree();
+        }
+
+        /**
+         * Not implemented.
+         */
+        public void remove() {}
+
+        private Tuple readFromTree() {
+            if (mMergeTree == null) {
+                // First read, we need to set up the queue and the array of
+                // file streams
+                mMergeTree = new TreeSet<TContainer>();
+
+                // Add one to the size in case we spill later.
+                mStreams =
+                    new ArrayList<DataInputStream>(mSpillFiles.size() + 1);
+
+                Iterator<File> i = mSpillFiles.iterator();
+                while (i.hasNext()) {
+                    try {
+                        DataInputStream in = 
+                            new DataInputStream(new BufferedInputStream(
+                                new FileInputStream(i.next())));
+                        mStreams.add(in);
+                        // Add the first tuple from this file into the
+                        // merge queue.
+                        addToQueue(null, mStreams.size() - 1);
+                    } catch (FileNotFoundException fnfe) {
+                        // We can't find our own spill file?  That should
+                        // never happen.
+                        String msg = "Unable to find our spill file.";
+                        log.fatal(msg, fnfe);
+                        throw new RuntimeException(msg, fnfe);
+                    }
+                }
+
+                // Prime one from memory too
+                if (mContents.size() > 0) {
+                    addToQueue(null, -1);
+                }
+            }
+
+            if (mMergeTree.size() == 0) return null;
+
+            // Pop the top one off the queue
+            TContainer c = mMergeTree.first();
+            mMergeTree.remove(c);
+
+            // Add the next tuple from whereever we read from into the
+            // queue.  Buffer the tuple we're returning, as we'll be
+            // reusing c.
+            Tuple t = c.tuple;
+            addToQueue(c, c.fileNum);
+
+            return t;
+        }
+
+        private void addToQueue(TContainer c, int fileNum) {
+            if (c == null) {
+                c = new TContainer();
+            }
+            c.fileNum = fileNum;
+
+            if (fileNum == -1) {
+                // Need to read from memory.  
+                do {
+                    c.tuple = readFromMemory();
+                    if (c.tuple != null) {
+                        // If we find a unique entry, then add it to the queue.
+                        // Otherwise ignore it and keep reading.
+                        if (mMergeTree.add(c)) {
+                            return;
+                        }
+                    }
+                } while (c.tuple != null);
+                return;
+            }
+
+            // Read the next tuple from the indicated file
+            DataInputStream in = mStreams.get(fileNum);
+            if (in != null) {
+                // There's still data in this file
+                c.tuple = gTupleFactory.newTuple();
+                do {
+                    try {
+                        c.tuple.readFields(in);
+                        // If we find a unique entry, then add it to the queue.
+                        // Otherwise ignore it and keep reading.  If we run out
+                        // of tuples to read that's fine, we just won't add a
+                        // new one from this file.
+                        if (mMergeTree.add(c)) {
+                            return;
+                        }
+                    } catch (EOFException eof) {
+                        // Out of tuples in this file.  Set our slot in the
+                        // array to null so we don't keep trying to read from
+                        // this file.
+                        mStreams.set(fileNum, null);
+                        return;
+                    } catch (IOException ioe) {
+                        String msg = "Unable to find our spill file.";
+                        log.fatal(msg, ioe);
+                        throw new RuntimeException(msg, ioe);
+                    }
+                } while (true);
+            }
+        }
+
+        // Function assumes that the reader lock is already held before we enter
+        // this function.
+        private Tuple readFromMemory() {
+            if (mContents.size() == 0) return null;
+
+            if (mMemoryPtr < mContents.size()) {
+                return ((ArrayList<Tuple>)mContents).get(mMemoryPtr++);
+            } else {
+                return null;
+            }
+        }
+
+        /**
+         * Pre-merge if there are too many spill files.  This avoids the issue
+         * of having too large a fan out in our merge.  Experimentation by
+         * the hadoop team has shown that 100 is about the optimal number
+         * of spill files.  This function modifies the mSpillFiles array
+         * and assumes the write lock is already held. It will not unlock it.
+         *
+         * Tuples are reconstituted as tuples, evaluated, and rewritten as
+         * tuples.  This is expensive, but I don't know how to read tuples
+         * from the file otherwise.
+         *
+         * This function is slightly different than the one in
+         * SortedDataBag, as it uses a TreeSet instead of a PriorityQ.
+         */
+        private void preMerge() {
+            if (mSpillFiles == null ||
+                    mSpillFiles.size() <= MAX_SPILL_FILES) {
+                return;
+            }
+
+            // While there are more than max spill files, gather max spill
+            // files together and merge them into one file.  Then remove the others
+            // from mSpillFiles.  The new spill files are attached at the
+            // end of the list, so I can just keep going until I get a
+            // small enough number without too much concern over uneven
+            // size merges.  Convert mSpillFiles to a linked list since
+            // we'll be removing pieces from the middle and we want to do
+            // it efficiently.
+            try {
+                LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
+                while (ll.size() > MAX_SPILL_FILES) {
+                    ListIterator<File> i = ll.listIterator();
+                    mStreams =
+                        new ArrayList<DataInputStream>(MAX_SPILL_FILES);
+                    mMergeTree = new TreeSet<TContainer>();
+
+                    for (int j = 0; j < MAX_SPILL_FILES; j++) {
+                        try {
+                            DataInputStream in =
+                                new DataInputStream(new BufferedInputStream(
+                                    new FileInputStream(i.next())));
+                            mStreams.add(in);
+                            addToQueue(null, mStreams.size() - 1);
+                            i.remove();
+                        } catch (FileNotFoundException fnfe) {
+                            // We can't find our own spill file?  That should
+                            // neer happen.
+                            String msg = "Unable to find our spill file.";
+                            log.fatal(msg, fnfe);
+                            throw new RuntimeException(msg, fnfe);
+                        }
+                    }
+
+                    // Get a new spill file.  This adds one to the end of
+                    // the spill files list.  So I need to append it to my
+                    // linked list as well so that it's still there when I
+                    // move my linked list back to the spill files.
+                    try {
+                        DataOutputStream out = getSpillFile();
+                        ll.add(mSpillFiles.get(mSpillFiles.size() - 1));
+                        Tuple t;
+                        while ((t = readFromTree()) != null) {
+                            t.write(out);
+                        }
+                        out.flush();
+                    } catch (IOException ioe) {
+                        String msg = "Unable to find our spill file.";
+                        log.fatal(msg, ioe);
+                        throw new RuntimeException(msg, ioe);
+                    }
+                }
+
+                // Now, move our new list back to the spill files array.
+                mSpillFiles = new ArrayList<File>(ll);
+            } finally {
+                // Reset mStreams and mMerge so that they'll be allocated
+                // properly for regular merging.
+                mStreams = null;
+                mMergeTree = null;
+            }
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java?rev=830664&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java Wed Oct 28 16:26:58 2009
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.BufferedInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.ListIterator;
+import java.util.PriorityQueue;
+  
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigWarning;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+
+
+/**
+ * An ordered collection of Tuples (possibly) with multiples.  Data is
+ * stored unsorted as it comes in, and only sorted when it is time to dump
+ * it to a file or when the first iterator is requested.  Experementation
+ * found this to be the faster than storing it sorted to begin with.
+ * 
+ * We allow a user defined comparator, but provide a default comparator in
+ * cases where the user doesn't specify one.
+ * 
+ * This bag is not registered with SpillableMemoryManager. It calculates
+ * the number of tuples to hold in memory and spill pro-actively into files.
+ */
+public class InternalSortedBag extends DefaultAbstractBag{
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 2L;
+
+    private static TupleFactory gTupleFactory = TupleFactory.getInstance();
+
+    private static final Log log = LogFactory.getLog(InternalSortedBag.class);
+
+    private transient Comparator<Tuple> mComp;
+    private transient boolean mReadStarted = false;
+    
+    private transient int cacheLimit;
+    private transient long maxMemUsage;
+    private transient long memUsage;    
+
+    static private class DefaultComparator implements Comparator<Tuple> {
+        @SuppressWarnings("unchecked")
+		public int compare(Tuple t1, Tuple t2) {
+            return t1.compareTo(t2);
+        }
+
+        public boolean equals(Object o) {
+        	return (o == this);
+        }
+
+    }
+    
+    public InternalSortedBag() {
+    	this(null);    	
+    }
+    
+    public InternalSortedBag(Comparator<Tuple> comp) {
+    	this(1, comp);
+    }
+
+    public InternalSortedBag(int bagCount, Comparator<Tuple> comp) {
+    	this(1, -1.0, comp);
+    }
+    
+    public InternalSortedBag(int bagCount, double percent, Comparator<Tuple> comp) {
+    	if (percent < 0) {
+        	percent = 0.1F;            
+        	if (PigMapReduce.sJobConf != null) {
+        		String usage = PigMapReduce.sJobConf.get("pig.cachedbag.memusage");
+        		if (usage != null) {
+        			percent = Float.parseFloat(usage);
+        		}
+        	}
+        }
+           	
+    	init(bagCount, percent, comp);
+    }
+    
+    /**
+     * @param comp Comparator to use to do the sorting.  If null,
+     * DefaultComparator will be used.
+     */
+    private void init(int bagCount, double percent, Comparator<Tuple> comp) {
+        mComp = (comp == null) ? new DefaultComparator() : comp;
+
+        
+    	mContents = new ArrayList<Tuple>();             
+             	 
+    	long max = Runtime.getRuntime().maxMemory();
+        maxMemUsage = (long)(((float)max * percent) / (float)bagCount);
+        cacheLimit = Integer.MAX_VALUE;
+        
+        // set limit to 0, if memusage is 0 or really really small.
+        // then all tuples are put into disk
+        if (maxMemUsage < 1) {
+        	cacheLimit = 0;
+        }        
+    }
+    
+    public void add(Tuple t) {
+    	if(mReadStarted) {
+            throw new IllegalStateException("InternalSortedBag is closed for adding new tuples");
+        }
+                
+    	if (mContents.size() > cacheLimit) {    		
+    		spill();
+    	}
+    	        
+    	mMemSizeChanged = true;
+        mContents.add(t);
+        
+        // check how many tuples memory can hold by getting average
+        // size of first 100 tuples
+        if(mSize < 100 && (mSpillFiles == null || mSpillFiles.isEmpty()))
+        {
+            memUsage += t.getMemorySize();
+            long avgUsage = memUsage / (long)mContents.size();
+            if (avgUsage >0) {
+            	cacheLimit = (int)(maxMemUsage / avgUsage);
+            }
+        }
+                
+        mSize++;
+    }
+    
+    public void addAll(DataBag b) {
+    	Iterator<Tuple> iter = b.iterator();
+    	while(iter.hasNext()) {
+    		add(iter.next());
+    	}
+    }
+
+    public void addAll(Collection<Tuple> c) {
+    	Iterator<Tuple> iter = c.iterator();
+    	while(iter.hasNext()) {
+    		add(iter.next());
+    	}
+    }    
+
+    public boolean isSorted() {
+        return true;
+    }
+    
+    public boolean isDistinct() {
+        return false;
+    }
+    
+    public Iterator<Tuple> iterator() {
+        return new SortedDataBagIterator();
+    }
+
+    public long spill() {
+        // Make sure we have something to spill.  Don't create empty
+        // files, as that will make a mess.
+        if (mContents.size() == 0) return 0;
+
+        // Lock the container before I spill, so that iterators aren't
+        // trying to read while I'm mucking with the container.
+        long spilled = 0;
+        
+        DataOutputStream out = null;
+        try {
+            out = getSpillFile();
+        } catch (IOException ioe) {
+            // Do not remove last file from spilled array. It was not
+            // added as File.createTmpFile threw an IOException
+            warn(
+                "Unable to create tmp file to spill to disk", PigWarning.UNABLE_TO_CREATE_FILE_TO_SPILL, ioe);
+            return 0;
+        }
+        try {
+            
+            Collections.sort((ArrayList<Tuple>)mContents, mComp);
+            
+            Iterator<Tuple> i = mContents.iterator();
+            while (i.hasNext()) {
+                i.next().write(out);
+                spilled++;
+                // This will spill every 16383 records.
+                if ((spilled & 0x3fff) == 0) reportProgress();
+            }
+            out.flush();
+        } catch (IOException ioe) {
+            // Remove the last file from the spilled array, since we failed to
+            // write to it.
+            mSpillFiles.remove(mSpillFiles.size() - 1);
+            warn(
+                "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+            return 0;
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException e) {
+                    warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
+                }
+            }
+        }
+        mContents.clear();
+        mMemSizeChanged = true;
+        memUsage = 0;
+        
+        return spilled;
+    }
+
+    /**
+     * An iterator that handles getting the next tuple from the bag.  
+     * Data can be stored in a combination of in memory and on disk.
+     */
+    private class SortedDataBagIterator implements Iterator<Tuple> {
+
+        /**
+         * A container to hold tuples in a priority queue.  Stores the
+         * file number the tuple came from, so that when the tuple is read
+         * out of the queue, we know which file to read its replacement
+         * tuple from.
+         */
+        private class PQContainer implements Comparable<PQContainer> {
+            public Tuple tuple;
+            public int fileNum;
+
+            public int compareTo(PQContainer other) {
+                return mComp.compare(tuple, other.tuple);
+            }
+            
+            public boolean equals(Object obj) {
+            	if (obj instanceof PQContainer) {
+            		return compareTo((PQContainer)obj) == 0;
+            	}
+            	
+            	return false;
+            }
+            
+            public int hashCode() {
+            	return tuple.hashCode();
+            }
+        }
+
+        // We have to buffer a tuple because there's no easy way for next
+        // to tell whether or not there's another tuple available, other
+        // than to read it.
+        private Tuple mBuf = null;
+        private int mMemoryPtr = 0;
+        private PriorityQueue<PQContainer> mMergeQ = null;
+        private ArrayList<DataInputStream> mStreams = null;
+        private int mCntr = 0;
+
+        SortedDataBagIterator() {
+            // If this is the first read, we need to sort the data.            
+        	if (!mReadStarted) {
+                preMerge();
+                Collections.sort((ArrayList<Tuple>)mContents, mComp);
+                mReadStarted = true;
+            }            
+        }
+
+        public boolean hasNext() { 
+            // See if we can find a tuple.  If so, buffer it.
+            mBuf = next();
+            return mBuf != null;
+        }
+
+        public Tuple next() {
+            // This will report progress every 1024 times through next.
+            // This should be much faster than using mod.
+            if ((mCntr++ & 0x3ff) == 0) reportProgress();
+
+            // If there's one in the buffer, use that one.
+            if (mBuf != null) {
+                Tuple t = mBuf;
+                mBuf = null;
+                return t;
+            }           
+            
+            if (mSpillFiles == null || mSpillFiles.size() == 0) {
+                return readFromMemory();
+            }          
+
+            // We have spill files, so we need to read the next tuple from
+            // one of those files or from memory.
+            return readFromPriorityQ();
+        }
+
+        /**
+         * Not implemented.
+         */
+        public void remove() {}
+
+        private Tuple readFromPriorityQ() {
+            if (mMergeQ == null) {
+                // First read, we need to set up the queue and the array of
+                // file streams
+                // Add one to the size for the list in memory.
+                mMergeQ =
+                    new PriorityQueue<PQContainer>(mSpillFiles.size() + 1);
+
+                // Add one to the size in case we spill later.
+                mStreams =
+                    new ArrayList<DataInputStream>(mSpillFiles.size() + 1);
+
+                Iterator<File> i = mSpillFiles.iterator();
+                while (i.hasNext()) {
+                    try {
+                        DataInputStream in = 
+                            new DataInputStream(new BufferedInputStream(
+                                new FileInputStream(i.next())));
+                        mStreams.add(in);
+                        // Add the first tuple from this file into the
+                        // merge queue.
+                        addToQueue(null, mStreams.size() - 1);
+                    } catch (FileNotFoundException fnfe) {
+                        // We can't find our own spill file?  That should
+                        // never happen.
+                        String msg = "Unable to find our spill file."; 
+                        log.fatal(msg, fnfe);
+                        throw new RuntimeException(msg, fnfe);
+                    }
+                }
+
+                // Prime one from memory too
+                if (mContents.size() > 0) {
+                    addToQueue(null, -1);
+                }
+            }
+
+            // Pop the top one off the queue
+            PQContainer c = mMergeQ.poll();
+            if (c == null) return null;
+
+            // Add the next tuple from whereever we read from into the
+            // queue.  Buffer the tuple we're returning, as we'll be
+            // reusing c.
+            Tuple t = c.tuple;
+            addToQueue(c, c.fileNum);
+
+            return t;
+        }
+
+        private void addToQueue(PQContainer c, int fileNum) {
+            if (c == null) {
+                c = new PQContainer();
+            }
+            c.fileNum = fileNum;
+
+            if (fileNum == -1) {
+                // Need to read from memory.  
+                 c.tuple = readFromMemory();
+                if (c.tuple != null) {
+                    mMergeQ.add(c);
+                }
+                return;
+            }
+
+            // Read the next tuple from the indicated file
+            DataInputStream in = mStreams.get(fileNum);
+            if (in != null) {
+                // There's still data in this file
+                c.tuple = gTupleFactory.newTuple();
+                try {
+                    c.tuple.readFields(in);
+                    mMergeQ.add(c);
+                } catch (EOFException eof) {
+                    // Out of tuples in this file.  Set our slot in the
+                    // array to null so we don't keep trying to read from
+                    // this file.
+                    mStreams.set(fileNum, null);
+                } catch (IOException ioe) {
+                    String msg = "Unable to find our spill file.";
+                    log.fatal(msg, ioe);
+                    throw new RuntimeException(msg, ioe);
+                }
+
+            }
+        }
+
+        // Function assumes that the reader lock is already held before we enter
+        // this function.
+        private Tuple readFromMemory() {
+            if (mContents.size() == 0) return null;
+
+            if (mMemoryPtr < mContents.size()) {
+                return ((ArrayList<Tuple>)mContents).get(mMemoryPtr++);
+            } else {
+                return null;
+            }
+        }
+
+        /**
+         * Pre-merge if there are too many spill files.  This avoids the issue
+         * of having too large a fan out in our merge.  Experimentation by
+         * the hadoop team has shown that 100 is about the optimal number
+         * of spill files.  This function modifies the mSpillFiles array
+         * and assumes the write lock is already held. It will not unlock it.
+         *
+         * Tuples are reconstituted as tuples, evaluated, and rewritten as
+         * tuples.  This is expensive, but I need to do this in order to
+         * use the sort spec that was provided to me.
+         */
+        private void preMerge() {
+            if (mSpillFiles == null ||
+                    mSpillFiles.size() <= MAX_SPILL_FILES) {
+                return;
+            }
+
+            // While there are more than max spill files, gather max spill
+            // files together and merge them into one file.  Then remove the others
+            // from mSpillFiles.  The new spill files are attached at the
+            // end of the list, so I can just keep going until I get a
+            // small enough number without too much concern over uneven
+            // size merges.  Convert mSpillFiles to a linked list since
+            // we'll be removing pieces from the middle and we want to do
+            // it efficiently.
+            try {
+                LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
+                while (ll.size() > MAX_SPILL_FILES) {
+                    ListIterator<File> i = ll.listIterator();
+                    mStreams =
+                        new ArrayList<DataInputStream>(MAX_SPILL_FILES);
+                    mMergeQ = new PriorityQueue<PQContainer>(MAX_SPILL_FILES);
+
+                    for (int j = 0; j < MAX_SPILL_FILES; j++) {
+                        try {
+                            DataInputStream in =
+                                new DataInputStream(new BufferedInputStream(
+                                    new FileInputStream(i.next())));
+                            mStreams.add(in);
+                            addToQueue(null, mStreams.size() - 1);
+                            i.remove();
+                        } catch (FileNotFoundException fnfe) {
+                            // We can't find our own spill file?  That should
+                            // neer happen.
+                            String msg = "Unable to find our spill file.";
+                            log.fatal(msg, fnfe);
+                            throw new RuntimeException(msg, fnfe);
+                        }
+                    }
+
+                    // Get a new spill file.  This adds one to the end of
+                    // the spill files list.  So I need to append it to my
+                    // linked list as well so that it's still there when I
+                    // move my linked list back to the spill files.
+                    try {
+                        DataOutputStream out = getSpillFile();
+                        ll.add(mSpillFiles.get(mSpillFiles.size() - 1));
+                        Tuple t;
+                        while ((t = readFromPriorityQ()) != null) {
+                            t.write(out);
+                        }
+                        out.flush();
+                    } catch (IOException ioe) {
+                        String msg = "Unable to find our spill file.";
+                        log.fatal(msg, ioe);
+                        throw new RuntimeException(msg, ioe);
+                    }
+                }
+
+                // Now, move our new list back to the spill files array.
+                mSpillFiles = new ArrayList<File>(ll);
+            } finally {
+                // Reset mStreams and mMerge so that they'll be allocated
+                // properly for regular merging.
+                mStreams = null;
+                mMergeQ = null;
+            }
+        }
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java Wed Oct 28 16:26:58 2009
@@ -208,17 +208,18 @@
             // same tuples, regardless of order.  Hopefully most of the
             // time the size check above will prevent this.
             // If either bag isn't already sorted, create a sorted bag out
-            // of it so I can guarantee order.
+            // of it so I can guarantee order.           
+            BagFactory factory = BagFactory.getInstance();
+            
             DataBag thisClone;
             DataBag otherClone;
-            thisClone = new SortedDataBag(null);
+            thisClone = factory.newSortedBag(null);
             Iterator<Tuple> i = iterator();
             while (i.hasNext()) thisClone.add(i.next());
-            if (other instanceof SortedDataBag ||
-                    other instanceof DistinctDataBag) {
+            if (((DataBag) other).isSorted() || ((DataBag) other).isDistinct()) {
                 otherClone = bOther;
             } else {
-                otherClone = new SortedDataBag(null);
+                otherClone = factory.newSortedBag(null);
                 i = bOther.iterator();
                 while (i.hasNext()) otherClone.add(i.next());
             }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java?rev=830664&r1=830663&r2=830664&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java Wed Oct 28 16:26:58 2009
@@ -24,6 +24,7 @@
 import org.apache.pig.data.*;
 import org.apache.pig.impl.util.Spillable;
 
+
 /**
  * This class will exercise the basic Pig data model and members. It tests for proper behavior in
  * assigment and comparision, as well as function application.
@@ -75,6 +76,12 @@
         }
     }
 
+    protected void tearDown() throws Exception {
+    	BagFactory.resetSelf();
+        System.clearProperty("pig.data.bag.factory.name");
+        System.clearProperty("pig.data.bag.factory.jar");
+    }
+
     // Test reading and writing default from memory, no spills.
     @Test
     public void testDefaultInMemory() throws Exception {
@@ -672,14 +679,14 @@
     @Test
     public void testDefaultBagFactory() throws Exception {
         BagFactory f = BagFactory.getInstance();
-
+       
         DataBag bag = f.newDefaultBag();
         DataBag sorted = f.newSortedBag(null);
         DataBag distinct = f.newDistinctBag();
 
         assertTrue("Expected a default bag", (bag instanceof DefaultDataBag));
         assertTrue("Expected a sorted bag", (sorted instanceof SortedDataBag));
-        assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));
+        assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));         
     }
 
     @Test
@@ -792,6 +799,12 @@
     }
     
     public void testInternalCachedBag() throws Exception {    
+    	// check adding empty tuple
+    	DataBag bg0 = new InternalCachedBag();
+    	bg0.add(TupleFactory.getInstance().newTuple());
+    	bg0.add(TupleFactory.getInstance().newTuple());
+    	assertEquals(bg0.size(), 2);
+    	
     	// check equal of bags
     	DataBag bg1 = new InternalCachedBag(1, 0.5f);
     	assertEquals(bg1.size(), 0);
@@ -846,6 +859,229 @@
         bg4.clear();
         assertEquals(bg4.size(), 0);        
     }
+    
+    public void testInternalSortedBag() throws Exception {    
+    	
+    	// check adding empty tuple
+    	DataBag bg0 = new InternalSortedBag();
+    	bg0.add(TupleFactory.getInstance().newTuple());
+    	bg0.add(TupleFactory.getInstance().newTuple());
+    	assertEquals(bg0.size(), 2);
+    	
+    	// check equal of bags
+    	DataBag bg1 = new InternalSortedBag();
+    	assertEquals(bg1.size(), 0);
+    	
+    	String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"c", "d" }};
+    	for (int i = 0; i < tupleContents.length; i++) {
+            bg1.add(Util.createTuple(tupleContents[i]));
+        }
+    	
+    	// check size, and isSorted(), isDistinct()
+    	assertEquals(bg1.size(), 3);
+    	assertTrue(bg1.isSorted());
+    	assertFalse(bg1.isDistinct());
+    	
+    	tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} };
+    	DataBag bg2 = new InternalSortedBag();
+        for (int i = 0; i < tupleContents.length; i++) {
+             bg2.add(Util.createTuple(tupleContents[i]));
+        }
+        assertEquals(bg1, bg2);
+        
+        Iterator<Tuple> iter = bg1.iterator();
+        iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
+        iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
+        iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+        
+        // check bag with data written to disk
+        DataBag bg3 = new InternalSortedBag(1, 0.0, null);
+        tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
+        for (int i = 0; i < tupleContents.length; i++) {
+            bg3.add(Util.createTuple(tupleContents[i]));
+        }
+        assertEquals(bg1, bg3);
+        
+        iter = bg3.iterator();
+        iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
+        iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
+        iter.next().equals(Util.createTuple(new String[] {"e", "f"}));                
+        
+        // call iterator methods with irregular order
+        iter = bg3.iterator();
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasNext());
+        
+        DataBag bg4 = new InternalSortedBag(1, 0.0, null);
+        bg4.add(iter.next());
+        bg4.add(iter.next());
+        assertTrue(iter.hasNext());
+        bg4.add(iter.next());
+        assertFalse(iter.hasNext());
+        assertFalse(iter.hasNext());
+        assertEquals(bg3, bg4);        
+        
+        // check clear
+        bg3.clear();
+        assertEquals(bg3.size(), 0);
+        
+        // test with all data spill out
+        DataBag bg5 = new InternalSortedBag();        
+        for(int j=0; j<3; j++) {
+        	for (int i = 0; i < tupleContents.length; i++) {
+        		bg5.add(Util.createTuple(tupleContents[i]));
+        	}     
+        	bg5.spill();
+        }
+        
+        assertEquals(bg5.size(), 9);
+        iter = bg5.iterator();
+        for(int i=0; i<3; i++) {
+        	iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
+        }
+        for(int i=0; i<3; i++) {
+        	iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
+        }
+        for(int i=0; i<3; i++) {
+        	iter.next().equals(Util.createTuple(new String[] {"e", "f"}));   
+        }
+        
+        // test with most data spill out, with some data in memory
+        // and merge of spill files
+        DataBag bg6 = new InternalSortedBag();        
+        for(int j=0; j<104; j++) {
+        	for (int i = 0; i < tupleContents.length; i++) {
+        		bg6.add(Util.createTuple(tupleContents[i]));
+        	}        	
+        	if (j != 103) {
+        		bg6.spill();
+        	}
+        }
+        
+        assertEquals(bg6.size(), 104*3);
+        iter = bg6.iterator();
+        for(int i=0; i<104; i++) {
+        	iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
+        }
+        for(int i=0; i<104; i++) {
+        	iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
+        }
+        for(int i=0; i<104; i++) {
+        	iter.next().equals(Util.createTuple(new String[] {"e", "f"}));   
+        }
+        
+        // check two implementation of sorted bag can compare correctly
+        DataBag bg7 = new SortedDataBag(null);        
+        for(int j=0; j<104; j++) {
+        	for (int i = 0; i < tupleContents.length; i++) {
+        		bg7.add(Util.createTuple(tupleContents[i]));
+        	}        	
+        	if (j != 103) {
+        		bg7.spill();
+        	}
+        }
+        assertEquals(bg6, bg7);
+    }
+    
+    public void testInternalDistinctBag() throws Exception {    
+    	// check adding empty tuple
+    	DataBag bg0 = new InternalDistinctBag();
+    	bg0.add(TupleFactory.getInstance().newTuple());
+    	bg0.add(TupleFactory.getInstance().newTuple());
+    	assertEquals(bg0.size(), 1);
+    	
+    	// check equal of bags
+    	DataBag bg1 = new InternalDistinctBag();
+    	assertEquals(bg1.size(), 0);
+    	
+    	String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}};
+    	for (int i = 0; i < tupleContents.length; i++) {
+            bg1.add(Util.createTuple(tupleContents[i]));
+        }
+    	
+    	// check size, and isSorted(), isDistinct()
+    	assertEquals(bg1.size(), 3);
+    	assertFalse(bg1.isSorted());
+    	assertTrue(bg1.isDistinct());
+    	
+    	tupleContents = new String[][] {{"a", "b" }, {"e", "d"}, {"e", "d"}, { "e", "f"} };
+    	DataBag bg2 = new InternalDistinctBag();
+        for (int i = 0; i < tupleContents.length; i++) {
+             bg2.add(Util.createTuple(tupleContents[i]));
+        }
+        assertEquals(bg1, bg2);
+        
+        Iterator<Tuple> iter = bg1.iterator();
+        iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
+        iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
+        iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+        
+        // check bag with data written to disk
+        DataBag bg3 = new InternalDistinctBag(1, 0.0);
+        tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}};
+        for (int i = 0; i < tupleContents.length; i++) {
+            bg3.add(Util.createTuple(tupleContents[i]));
+        }
+        assertEquals(bg2, bg3);
+        assertEquals(bg3.size(), 3);
+              
+        
+        // call iterator methods with irregular order
+        iter = bg3.iterator();
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasNext());
+        
+        DataBag bg4 = new InternalDistinctBag(1, 0.0);
+        bg4.add(iter.next());
+        bg4.add(iter.next());
+        assertTrue(iter.hasNext());
+        bg4.add(iter.next());
+        assertFalse(iter.hasNext());
+        assertFalse(iter.hasNext());
+        assertEquals(bg3, bg4);        
+        
+        // check clear
+        bg3.clear();
+        assertEquals(bg3.size(), 0);
+        
+        // test with all data spill out
+        DataBag bg5 = new InternalDistinctBag();        
+        for(int j=0; j<3; j++) {
+        	for (int i = 0; i < tupleContents.length; i++) {
+        		bg5.add(Util.createTuple(tupleContents[i]));
+        	}        
+        	bg5.spill();
+        }
+        
+        assertEquals(bg5.size(), 3);
+    
+        
+        // test with most data spill out, with some data in memory
+        // and merge of spill files
+        DataBag bg6 = new InternalDistinctBag();        
+        for(int j=0; j<104; j++) {
+        	for (int i = 0; i < tupleContents.length; i++) {
+        		bg6.add(Util.createTuple(tupleContents[i]));
+        	}        	
+        	if (j != 103) {
+        		bg6.spill();
+        	}
+        }
+        
+        assertEquals(bg6.size(), 3);       
+        
+        // check two implementation of sorted bag can compare correctly
+        DataBag bg7 = new DistinctDataBag();        
+        for(int j=0; j<104; j++) {
+        	for (int i = 0; i < tupleContents.length; i++) {
+        		bg7.add(Util.createTuple(tupleContents[i]));
+        	}        	
+        	if (j != 103) {
+        		bg7.spill();
+        	}
+        }
+        assertEquals(bg6, bg7);
+    }
 }