You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/24 20:54:34 UTC

svn commit: r883836 [20/23] - in /hadoop/pig/branches/load-store-redesign: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/ contrib/zebra/ contrib/zebr...

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java Tue Nov 24 19:54:19 2009
@@ -50,7 +50,7 @@
 
     protected static final long serialVersionUID = 1L;
 
-    protected static TupleFactory mTupleFactory = TupleFactory.getInstance();
+    protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
     protected static BagFactory mBagFactory = BagFactory.getInstance();
 
     protected List<PhysicalPlan> plans;

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java Tue Nov 24 19:54:19 2009
@@ -18,6 +18,7 @@
 
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -26,6 +27,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
@@ -42,6 +44,10 @@
 
 	
 	private static final long serialVersionUID = 1L;
+	private boolean[] mInnerFlags;
+	
+	// The schema is used only by the MRCompiler to support outer join
+	transient private List<Schema> inputSchema = new ArrayList<Schema>();
 	
 	transient private static Log log = LogFactory.getLog(POSkewedJoin.class);
 	
@@ -51,19 +57,30 @@
     private MultiMap<PhysicalOperator, PhysicalPlan> mJoinPlans;
 
     public POSkewedJoin(OperatorKey k)  {
-        this(k,-1,null);
+        this(k,-1,null, null);
     }
 
     public POSkewedJoin(OperatorKey k, int rp) {
-        this(k, rp, null);
+        this(k, rp, null, null);
     }
 
-    public POSkewedJoin(OperatorKey k, List<PhysicalOperator> inp) {
-        this(k, -1, inp);
+    public POSkewedJoin(OperatorKey k, List<PhysicalOperator> inp, boolean []flags) {
+        this(k, -1, inp, flags);
     }
 
-    public POSkewedJoin(OperatorKey k, int rp, List<PhysicalOperator> inp) {
-        super(k,rp,inp);      
+    public POSkewedJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, boolean []flags) {
+        super(k,rp,inp);
+        if (flags != null) {
+        	// copy the inner flags
+        	mInnerFlags = new boolean[flags.length];
+        	for (int i = 0; i < flags.length; i++) {
+        		mInnerFlags[i] = flags[i];
+        	}
+        }
+    }
+    
+    public boolean[] getInnerFlags() {
+    	return mInnerFlags;
     }
     
     public MultiMap<PhysicalOperator, PhysicalPlan> getJoinPlans() {
@@ -93,5 +110,13 @@
 	public boolean supportsMultipleOutputs() {	
 		return false;
 	}
+	
+	public void addSchema(Schema s) {
+		inputSchema.add(s);
+	}
+	
+	public Schema getSchema(int i) {
+		return inputSchema.get(i);
+	}
 
 }

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSortedDistinct.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSortedDistinct.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSortedDistinct.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSortedDistinct.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+
+/**
+ * This operator is a variation of PODistinct, the input to this operator
+ * must be sorted already. 
+ *
+ */
+public class POSortedDistinct extends PODistinct {
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+	private transient Tuple lastTuple;
+
+	public POSortedDistinct(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        super(k, rp, inp);
+    }
+
+    public POSortedDistinct(OperatorKey k, int rp) {
+        super(k, rp);
+    }
+
+    public POSortedDistinct(OperatorKey k, List<PhysicalOperator> inp) {
+        super(k, inp);
+    }
+
+    public POSortedDistinct(OperatorKey k) {
+        super(k);
+    }
+    
+
+    public Result getNext(Tuple t) throws ExecException {
+    	while(true) {
+    		Result in = processInput();
+    		if (in.returnStatus == POStatus.STATUS_NULL) {
+    			continue;
+    		}
+    		
+    		if (in.returnStatus == POStatus.STATUS_OK) {
+    			if (lastTuple == null || !lastTuple.equals(in.result)) {
+    				lastTuple = (Tuple)in.result;
+    				return in;
+    			} else {
+    				continue;
+    			}
+    		}
+    		
+    		if (in.returnStatus == POStatus.STATUS_EOP) {
+    			if (!isAccumulative() || !isAccumStarted()) {
+    				lastTuple = null;
+    			}
+    			return in;
+    		}
+    		
+    		// if there is an error, just return
+    		return in;
+    	}
+    }
+}

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Tue Nov 24 19:54:19 2009
@@ -95,12 +95,6 @@
 
         Map<E, Pair<Integer, Integer> > reducerMap = new HashMap<E, Pair<Integer, Integer> >();
 
-        InputStream is;
-        if (job != null) {
-            is = FileLocalizer.openDFSFile(keyDistFile,ConfigurationUtil.toProperties(job));
-        } else {
-            is = FileLocalizer.openDFSFile(keyDistFile);
-        }
         ReadToEndLoader loader = new ReadToEndLoader(new BinStorage(), job, 
                 keyDistFile, 0);
         DataBag partitionList;
@@ -142,13 +136,7 @@
                 }
             }
             // number of reducers
-            Integer cnt = 0;
-            if (minIndex < maxIndex) {
-                cnt = maxIndex - minIndex;
-            } else {
-                cnt = totalReducers[0] + maxIndex - minIndex;
-            }
-
+            Integer cnt = maxIndex - minIndex;
             reducerMap.put(keyT, new Pair(minIndex, cnt));// 1 is added to account for the 0 index
         }
         return reducerMap;

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java Tue Nov 24 19:54:19 2009
@@ -24,9 +24,9 @@
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Scanner;
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.pig.Slice;
@@ -61,7 +61,7 @@
     /** The connection to the table in Hbase **/
     private transient HTable m_table;
     /** The scanner over the table **/
-    private transient Scanner m_scanner;
+    private transient ResultScanner m_scanner;
 
     private transient ArrayList<Object> mProtoTuple;
 
@@ -153,7 +153,8 @@
     @Override
     public void init(DataStorage store) throws IOException {
         LOG.info("Init Hbase Slice " + this);
-        HBaseConfiguration conf = new HBaseConfiguration();
+        
+        HBaseConfiguration conf=new HBaseConfiguration();
         // connect to the given table
         m_table = new HTable(conf, m_tableName);
         // init the scanner
@@ -178,17 +179,18 @@
      * @throws IOException
      */
     private void restart(byte[] startRow) throws IOException {
+	Scan scan;
         if ((m_endRow != null) && (m_endRow.length > 0)) {
-            this.m_scanner = this.m_table.getScanner(m_inputColumns, startRow,
-                    m_endRow);
+	    scan = new Scan(startRow, m_endRow);
         } else {
-            this.m_scanner = this.m_table.getScanner(m_inputColumns, startRow);
+	    scan = new Scan(startRow);
         }
+	this.m_scanner = this.m_table.getScanner(scan);
     }
 
     @Override
     public boolean next(Tuple value) throws IOException {
-        RowResult result;
+        Result result;
         try {
             result = this.m_scanner.next();
         } catch (UnknownScannerException e) {
@@ -215,15 +217,14 @@
      * @param tuple
      *            tuple
      */
-    private void convertResultToTuple(RowResult result, Tuple tuple) {
+    private void convertResultToTuple(Result result, Tuple tuple) {
         if (mProtoTuple == null)
             mProtoTuple = new ArrayList<Object>();
 
-        Cell cell = null;
         byte[] value = null;
         for (byte[] column : m_inputColumns) {
-            cell = result.get(column);
-            if (cell == null || (value = cell.getValue()) == null) {
+            value = result.getValue(column);
+            if (value == null) {
                 mProtoTuple.add(null);
             } else {
                 mProtoTuple.add(new DataByteArray(value));

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/local/datastorage/LocalPath.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/local/datastorage/LocalPath.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/local/datastorage/LocalPath.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/local/datastorage/LocalPath.java Tue Nov 24 19:54:19 2009
@@ -26,12 +26,15 @@
 import java.util.HashMap;
 import java.util.Properties;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.apache.pig.backend.datastorage.SeekableInputStream;
 
 public abstract class LocalPath implements ElementDescriptor {
 
+    private Log log = LogFactory.getLog(getClass());
     protected DataStorage fs;
     protected File path;
 
@@ -121,7 +124,9 @@
     }
 
     public void delete() throws IOException {
-        getCurPath().delete();
+        boolean res = getCurPath().delete();
+        if (!res)
+            log.warn("LocalPath.delete: failed to delete" + getCurPath());
     }
 
     public Properties getConfiguration() throws IOException {

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java Tue Nov 24 19:54:19 2009
@@ -268,10 +268,6 @@
 	    return result;
 	}
 	
-	public boolean equals(Object obj) {
-	    return this.equals(obj);
-	}
-	
     }
 
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/AVG.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/AVG.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/AVG.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/AVG.java Tue Nov 24 19:54:19 2009
@@ -22,6 +22,7 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
@@ -41,7 +42,7 @@
  * Generates the average of the values of the first field of a tuple. This class is Algebraic in
  * implemenation, so if possible the execution will be split into a local and global application
  */
-public class AVG extends EvalFunc<Double> implements Algebraic {
+public class AVG extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
     
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
@@ -178,6 +179,7 @@
         for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
             Tuple t = it.next();
             Double d = (Double)t.get(0);
+            
             // we count nulls in avg as contributing 0
             // a departure from SQL for performance of 
             // COUNT() which implemented by just inspecting
@@ -261,5 +263,53 @@
         funcList.add(new FuncSpec(IntAvg.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));
         funcList.add(new FuncSpec(LongAvg.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
         return funcList;
+    }
+
+    /* Accumulator interface implementation */
+    
+    private Double intermediateSum = null;
+    private Double intermediateCount = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double sum = sum(b);
+            if(sum == null) {
+                return;
+            }
+            // set default values
+            if (intermediateSum == null || intermediateCount == null) {
+                intermediateSum = 0.0;
+                intermediateCount = 0.0;
+            }
+            
+            double count = (Long)count(b);
+
+            if (count > 0) {
+                intermediateCount += count;
+                intermediateSum += sum;
+            }
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing average in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }        
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+        intermediateCount = null;
+    }
+
+    @Override
+    public Double getValue() {
+        Double avg = null;
+        if (intermediateCount > 0) {
+            avg = new Double(intermediateSum / intermediateCount);
+        }
+        return avg;
     }    
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java Tue Nov 24 19:54:19 2009
@@ -323,6 +323,10 @@
         return new BinStorageInputFormat();
     }
 
+    public int hashCode() {
+        return 42; 
+    }
+
     @Override
     public LoadCaster getLoadCaster() {
         return this;

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinaryStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinaryStorage.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinaryStorage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinaryStorage.java Tue Nov 24 19:54:19 2009
@@ -146,6 +146,10 @@
         return true;
     }
 
+    public int hashCode() {
+        return 42; 
+    }
+
     @Override
     public InputFormat getInputFormat() throws IOException {
         return null;

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/COUNT.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/COUNT.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/COUNT.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/COUNT.java Tue Nov 24 19:54:19 2009
@@ -21,6 +21,7 @@
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -36,7 +37,7 @@
  * Generates the count of the values of the first field of a tuple. This class is Algebraic in
  * implemenation, so if possible the execution will be split into a local and global functions
  */
-public class COUNT extends EvalFunc<Long> implements Algebraic{
+public class COUNT extends EvalFunc<Long> implements Algebraic, Accumulator<Long>{
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
     @Override
@@ -136,5 +137,38 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.LONG)); 
     }
+    
+    /* Accumulator interface implementation */
+    private long intermediateCount = 0L;
+
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            DataBag bag = (DataBag)b.get(0);
+            Iterator it = bag.iterator();
+            while (it.hasNext()){
+                Tuple t = (Tuple)it.next();
+                if (t != null && t.size() > 0 && t.get(0) != null) {
+                    intermediateCount += 1;
+                }
+            }
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateCount = 0L;
+    }
+
+    @Override
+    public Long getValue() {
+        return intermediateCount;
+    }
 
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/COUNT_STAR.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/COUNT_STAR.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/COUNT_STAR.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/COUNT_STAR.java Tue Nov 24 19:54:19 2009
@@ -21,6 +21,7 @@
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -38,7 +39,7 @@
  * implements SQL COUNT(*) semantics. This class is Algebraic in
  * implemenation, so if possible the execution will be split into a local and global functions
  */
-public class COUNT_STAR extends EvalFunc<Long> implements Algebraic{
+public class COUNT_STAR extends EvalFunc<Long> implements Algebraic, Accumulator<Long>{
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
     @Override
@@ -127,4 +128,31 @@
         return new Schema(new Schema.FieldSchema(null, DataType.LONG)); 
     }
 
+    /* Accumulator interface imlpemenatation */
+    
+    private long intermediateCount = 0L;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            intermediateCount += sum(b);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateCount = 0L;
+    }
+
+    @Override
+    public Long getValue() {
+        return intermediateCount;
+    }
+
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleAvg.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleAvg.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleAvg.java Tue Nov 24 19:54:19 2009
@@ -21,6 +21,7 @@
 import java.util.HashMap;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
@@ -38,7 +39,7 @@
  * Generates the average of the values of the first field of a tuple. This class is Algebraic in
  * implemenation, so if possible the execution will be split into a local and global application
  */
-public class DoubleAvg extends EvalFunc<Double> implements Algebraic {
+public class DoubleAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
     
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
@@ -233,4 +234,52 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
+    
+    /* Accumulator interface */
+    
+    private Double intermediateSum = null;
+    private Double intermediateCount = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double sum = sum(b);
+            if(sum == null) {
+                return;
+            }
+            // set default values
+            if (intermediateSum == null || intermediateCount == null) {
+                intermediateSum = 0.0;
+                intermediateCount = 0.0;
+            }
+            
+            double count = (Long)count(b);
+
+            if (count > 0) {
+                intermediateCount += count;
+                intermediateSum += sum;
+            }
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing average in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }        
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+        intermediateCount = null;
+    }
+
+    @Override
+    public Double getValue() {
+        Double avg = null;
+        if (intermediateCount > 0) {
+            avg = new Double(intermediateSum / intermediateCount);
+        }
+        return avg;
+    }    
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleMax.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleMax.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleMax.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -34,7 +35,7 @@
 /**
  * Generates the max of the values of the first field of a tuple.
  */
-public class DoubleMax extends EvalFunc<Double> implements Algebraic {
+public class DoubleMax extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
 
     @Override
     public Double exec(Tuple input) throws IOException {
@@ -154,5 +155,40 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
+
+    /* Accumulator interface */
+    
+    private Double intermediateMax = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double curMax = max(b);
+            if (curMax == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative infinity */
+            if (intermediateMax == null) {
+                intermediateMax = Double.NEGATIVE_INFINITY;
+            }
+            intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing max in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMax = null;
+    }
+
+    @Override
+    public Double getValue() {
+        return intermediateMax;
+    }
     
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleMin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleMin.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleMin.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
 /**
  * Generates the min of the Double values in the first field of a tuple.
  */
-public class DoubleMin extends EvalFunc<Double> implements Algebraic {
+public class DoubleMin extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
 
     @Override
     public Double exec(Tuple input) throws IOException {
@@ -152,4 +153,38 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
+    
+    /* Accumulator interface implementation */
+    private Double intermediateMin = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double curMin = min(b);
+            if (curMin == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative infinity */
+            if (intermediateMin == null) {
+                intermediateMin = Double.POSITIVE_INFINITY;
+            }
+            intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMin = null;
+    }
+
+    @Override
+    public Double getValue() {
+        return intermediateMin;
+    }    
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleSum.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleSum.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleSum.java Tue Nov 24 19:54:19 2009
@@ -21,6 +21,7 @@
 import java.util.HashMap;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
@@ -38,7 +39,7 @@
 /**
  * Generates the sum of the values of the first field of a tuple.
  */
-public class DoubleSum extends EvalFunc<Double> implements Algebraic {
+public class DoubleSum extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
 
     @Override
     public Double exec(Tuple input) throws IOException {
@@ -160,4 +161,34 @@
         return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
     
+    /* Accumulator interface implementation*/
+    private Double intermediateSum = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double curSum = sum(b);
+            if (curSum == null) {
+                return;
+            }
+            intermediateSum = (intermediateSum == null ? 0.0 : intermediateSum) + curSum;
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing sum in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+    }
+
+    @Override
+    public Double getValue() {
+        return intermediateSum;
+    }    
+    
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatAvg.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatAvg.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatAvg.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
  * Generates the average of the values of the first field of a tuple. This class is Algebraic in
  * implementation, so if possible the execution will be split into a local and global application
  */
-public class FloatAvg extends EvalFunc<Double> implements Algebraic {
+public class FloatAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
     
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
@@ -230,5 +231,53 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
+    
+    /* Accumulator interface */
+
+    private Double intermediateSum = null;
+    private Double intermediateCount = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double sum = sum(b);
+            if(sum == null) {
+                return;
+            }
+            // set default values
+            if (intermediateSum == null || intermediateCount == null) {
+                intermediateSum = 0.0;
+                intermediateCount = 0.0;
+            }
+            
+            double count = (Long)count(b);
+            
+            if (count > 0) {
+                intermediateCount += count;
+                intermediateSum += sum;
+            }
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing average in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }        
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+        intermediateCount = null;
+    }
+
+    @Override
+    public Double getValue() {
+        Double avg = null;
+        if (intermediateCount > 0) {
+            avg = new Double(intermediateSum / intermediateCount);
+        }
+        return avg;
+    }    
 
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatMax.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatMax.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatMax.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
 /**
  * Generates the max of the values of the first field of a tuple.
  */
-public class FloatMax extends EvalFunc<Float> implements Algebraic {
+public class FloatMax extends EvalFunc<Float> implements Algebraic, Accumulator<Float> {
 
     @Override
     public Float exec(Tuple input) throws IOException {
@@ -152,4 +153,39 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.FLOAT)); 
     }
+    
+    /* Accumulator interface */
+    
+    private Float intermediateMax = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Float curMax = max(b);
+            if (curMax == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative infinity */
+            if (intermediateMax == null) {
+                intermediateMax = Float.NEGATIVE_INFINITY;
+            }
+            intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMax = null;
+    }
+
+    @Override
+    public Float getValue() {
+        return intermediateMax;
+    }
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatMin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatMin.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatMin.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
 /**
  * Generates the min of the Float values in the first field of a tuple.
  */
-public class FloatMin extends EvalFunc<Float> implements Algebraic {
+public class FloatMin extends EvalFunc<Float> implements Algebraic, Accumulator<Float> {
 
     @Override
     public Float exec(Tuple input) throws IOException {
@@ -152,4 +153,38 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.FLOAT)); 
     }
+    
+    /* Accumulator interface implementation */
+    private Float intermediateMin = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Float curMin = min(b);
+            if (curMin == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative infinity */
+            if (intermediateMin == null) {
+                intermediateMin = Float.POSITIVE_INFINITY;
+            }
+            intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMin = null;
+    }
+
+    @Override
+    public Float getValue() {
+        return intermediateMin;
+    }    
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatSum.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatSum.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatSum.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
 /**
  * Generates the sum of the Float values in the first field of a tuple.
  */
-public class FloatSum extends EvalFunc<Double> implements Algebraic {
+public class FloatSum extends EvalFunc<Double> implements Algebraic, Accumulator<Double>{
 
     @Override
     public Double exec(Tuple input) throws IOException {
@@ -196,5 +197,35 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
+    
+    /* Accumulator interface implementation*/
+    private Double intermediateSum = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double curSum = sum(b);
+            if (curSum == null) {
+                return;
+            }
+            intermediateSum = (intermediateSum == null ? 0.0 : intermediateSum) + curSum;
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+    }
+
+    @Override
+    public Double getValue() {
+        return intermediateSum;
+    }    
 
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntAvg.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntAvg.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntAvg.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
  * Generates the average of the values of the first field of a tuple. This class is Algebraic in
  * implementation, so if possible the execution will be split into a local and global application
  */
-public class IntAvg extends EvalFunc<Double> implements Algebraic {
+public class IntAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
     
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
@@ -236,4 +237,51 @@
         return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
 
+    /* Accumulator interface */
+
+    private Long intermediateSum = null;
+    private Double intermediateCount = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Long sum = sum(b);
+            if(sum == null) {
+                return;
+            }
+            // set default values
+            if (intermediateSum == null || intermediateCount == null) {
+                intermediateSum = 0L;
+                intermediateCount = 0.0;
+            }
+            
+            double count = (Long)count(b);
+
+            if (count > 0) {
+                intermediateCount += count;
+                intermediateSum += sum;
+            }
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing average in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }        
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+        intermediateCount = null;
+    }
+
+    @Override
+    public Double getValue() {
+        Double avg = null;
+        if (intermediateCount > 0) {
+            avg = new Double(intermediateSum / intermediateCount);
+        }
+        return avg;
+    }
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntMax.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntMax.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntMax.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
 /**
  * Generates the max of the values of the first field of a tuple.
  */
-public class IntMax extends EvalFunc<Integer> implements Algebraic {
+public class IntMax extends EvalFunc<Integer> implements Algebraic, Accumulator<Integer> {
 
     @Override
     public Integer exec(Tuple input) throws IOException {
@@ -153,4 +154,39 @@
         return new Schema(new Schema.FieldSchema(null, DataType.INTEGER)); 
     }
     
+    /* Accumulator interface */
+    
+    private Integer intermediateMax = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Integer curMax = max(b);
+            if (curMax == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative infinity */
+            if (intermediateMax == null) {
+                intermediateMax = Integer.MIN_VALUE;
+            }
+            intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing max in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMax = null;
+    }
+
+    @Override
+    public Integer getValue() {
+        return intermediateMax;
+    }
+    
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntMin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntMin.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntMin.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
 /**
  * Generates the min of the Integer values in the first field of a tuple.
  */
-public class IntMin extends EvalFunc<Integer> implements Algebraic {
+public class IntMin extends EvalFunc<Integer> implements Algebraic, Accumulator<Integer> {
 
     @Override
     public Integer exec(Tuple input) throws IOException {
@@ -154,4 +155,38 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.INTEGER)); 
     }
+    
+    /* Accumulator interface implementation */
+    private Integer intermediateMin = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Integer curMin = min(b);
+            if (curMin == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative infinity */
+            if (intermediateMin == null) {
+                intermediateMin = Integer.MAX_VALUE;
+            }
+            intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMin = null;
+    }
+
+    @Override
+    public Integer getValue() {
+        return intermediateMin;
+    }    
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntSum.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntSum.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntSum.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -34,7 +35,7 @@
 /**
  * Generates the sum of the Integer in the first field of a tuple.
  */
-public class IntSum extends EvalFunc<Long> implements Algebraic {
+public class IntSum extends EvalFunc<Long> implements Algebraic, Accumulator<Long> {
 
     @Override
     public Long exec(Tuple input) throws IOException {
@@ -198,4 +199,33 @@
         return new Schema(new Schema.FieldSchema(null, DataType.LONG)); 
     }
 
+    /* Accumulator interface implementation*/
+    private Long intermediateSum = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Long curSum = sum(b);
+            if (curSum == null) {
+                return;
+            }
+            intermediateSum = (intermediateSum == null ? 0L : intermediateSum) + curSum;
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+    }
+
+    @Override
+    public Long getValue() {
+        return intermediateSum;
+    }    
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongAvg.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongAvg.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongAvg.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
  * Generates the average of the values of the first field of a tuple. This class is Algebraic in
  * implementation, so if possible the execution will be split into a local and global application
  */
-public class LongAvg extends EvalFunc<Double> implements Algebraic {
+public class LongAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
     
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
@@ -230,5 +231,53 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
+    
+    /* Accumulator interface */
+   
+    private Long intermediateSum = null;
+    private Double intermediateCount = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Long sum = sum(b);
+            if(sum == null) {
+                return;
+            }
+            // set default values
+            if (intermediateSum == null || intermediateCount == null) {
+                intermediateSum = 0L;
+                intermediateCount = 0.0;
+            }
+            
+            double count = (Long)count(b);
+
+            if (count > 0) {
+                intermediateCount += count;
+                intermediateSum += sum;
+            }
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing average in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }        
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+        intermediateCount = null;
+    }
+
+    @Override
+    public Double getValue() {
+        Double avg = null;
+        if (intermediateCount > 0) {
+            avg = new Double(intermediateSum / intermediateCount);
+        }
+        return avg;
+    }
 
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongMax.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongMax.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongMax.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
 /**
  * Generates the max of the values of the first field of a tuple.
  */
-public class LongMax extends EvalFunc<Long> implements Algebraic {
+public class LongMax extends EvalFunc<Long> implements Algebraic, Accumulator<Long> {
 
     @Override
     public Long exec(Tuple input) throws IOException {
@@ -152,4 +153,39 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.LONG)); 
     }
+    
+    /* Accumulator interface */
+    
+    private Long intermediateMax = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Long curMax = max(b);
+            if (curMax == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative infinity */
+            if (intermediateMax == null) {
+                intermediateMax = Long.MIN_VALUE;
+            }
+            intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMax = null;
+    }
+
+    @Override
+    public Long getValue() {
+        return intermediateMax;
+    }
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongMin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongMin.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongMin.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
 /**
  * Generates the min of the Long values in the first field of a tuple.
  */
-public class LongMin extends EvalFunc<Long> implements Algebraic {
+public class LongMin extends EvalFunc<Long> implements Algebraic, Accumulator<Long> {
 
     @Override
     public Long exec(Tuple input) throws IOException {
@@ -152,4 +153,38 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.LONG)); 
     }
+    
+    /* Accumulator interface implementation */
+    private Long intermediateMin = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Long curMin = min(b);
+            if (curMin == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative infinity */
+            if (intermediateMin == null) {
+                intermediateMin = Long.MAX_VALUE;
+            }
+            intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMin = null;
+    }
+
+    @Override
+    public Long getValue() {
+        return intermediateMin;
+    }    
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongSum.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongSum.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongSum.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
 /**
  * Generates the sum of the Long values in the first field of a tuple.
  */
-public class LongSum extends EvalFunc<Long> implements Algebraic {
+public class LongSum extends EvalFunc<Long> implements Algebraic, Accumulator<Long> {
 
     @Override
     public Long exec(Tuple input) throws IOException {
@@ -155,5 +156,35 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.LONG)); 
     }
+    
+    /* Accumulator interface implementation*/
+    private Long intermediateSum = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Long curSum = sum(b);
+            if (curSum == null) {
+                return;
+            }
+            intermediateSum = (intermediateSum == null ? 0L : intermediateSum) + curSum;
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+    }
+
+    @Override
+    public Long getValue() {
+        return intermediateSum;
+    }    
 
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MAX.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MAX.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MAX.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MAX.java Tue Nov 24 19:54:19 2009
@@ -22,6 +22,7 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
@@ -40,7 +41,7 @@
 /**
  * Generates the max of the values of the first field of a tuple.
  */
-public class MAX extends EvalFunc<Double> implements Algebraic {
+public class MAX extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
 
     @Override
     public Double exec(Tuple input) throws IOException {
@@ -216,5 +217,39 @@
         funcList.add(new FuncSpec(LongMax.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
         funcList.add(new FuncSpec(StringMax.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY)));
         return funcList;
+    }
+
+    /* Accumulator interface implementation */
+    private Double intermediateMax = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double curMax = max(b);
+            if (curMax == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative infinity */
+            if (intermediateMax == null) {
+                intermediateMax = Double.NEGATIVE_INFINITY;
+            }
+            intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMax = null;
+    }
+
+    @Override
+    public Double getValue() {
+        return intermediateMax;
     }    
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MIN.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MIN.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MIN.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MIN.java Tue Nov 24 19:54:19 2009
@@ -22,6 +22,7 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
@@ -40,7 +41,7 @@
 /**
  * Generates the min of the values of the first field of a tuple.
  */
-public class MIN extends EvalFunc<Double> implements Algebraic {
+public class MIN extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
 
     @Override
     public Double exec(Tuple input) throws IOException {
@@ -217,5 +218,39 @@
         funcList.add(new FuncSpec(LongMin.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
         funcList.add(new FuncSpec(StringMin.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY)));
         return funcList;
+    }
+
+    /* Accumulator interface implementation */
+    private Double intermediateMin = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double curMin = min(b);
+            if (curMin == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative infinity */
+            if (intermediateMin == null) {
+                intermediateMin = Double.POSITIVE_INFINITY;
+            }
+            intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMin = null;
+    }
+
+    @Override
+    public Double getValue() {
+        return intermediateMin;
     }    
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigDump.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigDump.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigDump.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigDump.java Tue Nov 24 19:54:19 2009
@@ -33,7 +33,7 @@
 // XXX: FIXME - make this work with new load-store redesign
 public class PigDump implements StoreFunc {
 
-    public static String recordDelimiter = "\n";
+    public static final String recordDelimiter = "\n";
     
 
     OutputStream os;

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java Tue Nov 24 19:54:19 2009
@@ -291,7 +291,10 @@
 
     @Override
     public boolean equals(Object obj) {
-        return equals((PigStorage)obj);
+        if (obj instanceof PigStorage)
+            return equals((PigStorage)obj);
+        else
+            return false;
     }
 
     public boolean equals(PigStorage other) {
@@ -341,5 +344,8 @@
         return LoadFunc.getAbsolutePath(location, curDir);
     }
 
-   
+    public int hashCode() {
+        return (int)fieldDel;
+    }
+
  }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/SUM.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/SUM.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/SUM.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/SUM.java Tue Nov 24 19:54:19 2009
@@ -22,6 +22,7 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
@@ -39,7 +40,7 @@
 /**
  * Generates the sum of the values of the first field of a tuple.
  */
-public class SUM extends EvalFunc<Double> implements Algebraic {
+public class SUM extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
 
     @Override
     public Double exec(Tuple input) throws IOException {
@@ -222,6 +223,36 @@
         funcList.add(new FuncSpec(IntSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));
         funcList.add(new FuncSpec(LongSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
         return funcList;
+    }
+
+    /* Accumulator interface implementation*/
+    private Double intermediateSum = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double curSum = sum(b);
+            if (curSum == null) {
+                return;
+            }
+            intermediateSum = (intermediateSum == null ? 0.0 : intermediateSum) + curSum;
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing sum in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+    }
+
+    @Override
+    public Double getValue() {
+        return intermediateSum;
     }    
     
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/StringMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/StringMax.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/StringMax.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/StringMax.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
 /**
  * Generates the max of the values of the first field of a tuple.
  */
-public class StringMax extends EvalFunc<String> implements Algebraic {
+public class StringMax extends EvalFunc<String> implements Algebraic, Accumulator<String> {
 
     @Override
     public String exec(Tuple input) throws IOException {
@@ -152,4 +153,39 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY)); 
     }
+
+
+    /* accumulator interface */
+    private String intermediateMax = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            String curMax = max(b);
+            if (curMax == null) {
+                return;
+            }
+            // check if it lexicographically follows curMax
+            if (intermediateMax == null || intermediateMax.compareTo(curMax) > 0) {
+                intermediateMax = curMax;
+            }            
+
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing max in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMax = null;
+    }
+
+    @Override
+    public String getValue() {
+        return intermediateMax;
+    }
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/StringMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/StringMin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/StringMin.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/StringMin.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
 /**
  * Generates the min of the String values in the first field of a tuple.
  */
-public class StringMin extends EvalFunc<String> implements Algebraic {
+public class StringMin extends EvalFunc<String> implements Algebraic, Accumulator<String> {
 
     @Override
     public String exec(Tuple input) throws IOException {
@@ -154,5 +155,39 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY)); 
     }
+    
+    /* accumulator interface */
+    private String intermediateMin = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            String curMin = min(b);
+            if (curMin == null) {
+                return;
+            }
+            // check if it lexicographically follows curMax
+            if (intermediateMin == null || intermediateMin.compareTo(curMin) < 0) {
+                intermediateMin = curMin;
+            }            
+
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing max in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMin = null;
+    }
+
+    @Override
+    public String getValue() {
+        return intermediateMin;
+    }
 
 }

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/AccumulativeBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/AccumulativeBag.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/AccumulativeBag.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/AccumulativeBag.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer;
+
+public class AccumulativeBag implements DataBag {
+    private static final long serialVersionUID = 1L;	 
+     
+    private transient AccumulativeTupleBuffer buffer;
+    private int index;
+    
+    public AccumulativeBag(AccumulativeTupleBuffer buffer, int index) {
+        this.buffer = buffer;
+        this.index = index;
+    }
+    
+    public void add(Tuple t) {
+        throw new RuntimeException("AccumulativeBag does not support add operation");
+    }
+
+    public void addAll(DataBag b) {
+        throw new RuntimeException("AccumulativeBag does not support add operation");
+    }
+
+    public void clear() {
+        throw new RuntimeException("AccumulativeBag does not support clear operation");
+    }
+
+    public boolean isDistinct() {		
+        return false;
+    }
+
+    public boolean isSorted() {		
+        return false;
+    }	
+    
+    public AccumulativeTupleBuffer getTuplebuffer() {
+        return buffer;
+    }
+
+    public Iterator<Tuple> iterator() {				
+        return buffer.getTuples(index);
+    }
+
+    public void markStale(boolean stale) {		
+
+    }
+
+    public long size() {		
+        throw new RuntimeException("AccumulativeBag does not support size() operation");
+    }
+
+    public long getMemorySize() {	
+        return 0;
+    }
+
+    public long spill() {		
+        return 0;
+    }
+
+    public void readFields(DataInput datainput) throws IOException {
+        throw new IOException("AccumulativeBag does not support readFields operation");
+    }
+
+    public void write(DataOutput dataoutput) throws IOException {
+        throw new IOException("AccumulativeBag does not support write operation");
+    }
+
+    public int compareTo(Object other) {
+        throw new RuntimeException("AccumulativeBag does not support compareTo() operation");
+    }
+    
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        
+        return false;
+    }
+        
+    public int hashCode() {
+        assert false : "hashCode not designed";
+        return 42;
+    }
+
+}

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultAbstractBag.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultAbstractBag.java Tue Nov 24 19:54:19 2009
@@ -29,6 +29,7 @@
 import java.util.ArrayList;
 
 import org.apache.pig.PigException;
+import org.apache.pig.PigWarning;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
@@ -161,7 +162,9 @@
             mContents.clear();
             if (mSpillFiles != null) {
                 for (int i = 0; i < mSpillFiles.size(); i++) {
-                    mSpillFiles.get(i).delete();
+                    boolean res = mSpillFiles.get(i).delete();
+                    if (!res)
+                        warn ("DefaultAbstractBag.clear: failed to delete " + mSpillFiles.get(i), PigWarning.DELETE_FAILED, null);  
                 }
                 mSpillFiles.clear();
             }
@@ -298,7 +301,10 @@
     protected void finalize() {
         if (mSpillFiles != null) {
             for (int i = 0; i < mSpillFiles.size(); i++) {
-                mSpillFiles.get(i).delete();
+                boolean res = mSpillFiles.get(i).delete();
+                if (!res)
+                    warn ("DefaultAbstractBag.finalize: failed to delete " + mSpillFiles.get(i), PigWarning.DELETE_FAILED, null);
+                    
             }
         }
     }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DistinctDataBag.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DistinctDataBag.java Tue Nov 24 19:54:19 2009
@@ -203,6 +203,17 @@
             public int compareTo(TContainer other) {
                 return tuple.compareTo(other.tuple);
             }
+
+            public boolean equals(Object obj){
+                if (obj instanceof TContainer)
+                    return tuple.equals(((TContainer)obj).tuple);
+                else
+                    return false;
+            }
+
+            public int hashCode() {
+                return tuple.hashCode(); 
+            }
         }
 
         // We have to buffer a tuple because there's no easy way for next

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalSortedBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalSortedBag.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalSortedBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalSortedBag.java Tue Nov 24 19:54:19 2009
@@ -83,6 +83,9 @@
         	return (o == this);
         }
 
+        public int hashCode() {
+            return 42; 
+        }
     }
     
     public InternalSortedBag() {

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/NonSpillableDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/NonSpillableDataBag.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/NonSpillableDataBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/NonSpillableDataBag.java Tue Nov 24 19:54:19 2009
@@ -192,6 +192,10 @@
         return compareTo(obj) == 0;
     }
 
+    public int hashCode() {
+        return mContents.hashCode();
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public int compareTo(Object other) {

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SingleTupleBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SingleTupleBag.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SingleTupleBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SingleTupleBag.java Tue Nov 24 19:54:19 2009
@@ -157,6 +157,15 @@
         return 0;
     }
 
+    public boolean equals(Object o){
+        // TODO: match to compareTo if it is updated
+        return true;
+    }
+
+    public int hashCode() {
+        return 42; 
+    }
+
     class TBIterator implements Iterator<Tuple> {
         boolean nextDone = false;
         /* (non-Javadoc)

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SortedDataBag.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SortedDataBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SortedDataBag.java Tue Nov 24 19:54:19 2009
@@ -71,6 +71,10 @@
             return false;
         }
 
+        public int hashCode() {
+            return 42; 
+        }
+
     }
 
     /**
@@ -177,6 +181,17 @@
             public int compareTo(PQContainer other) {
                 return mComp.compare(tuple, other.tuple);
             }
+
+            public boolean equals(Object other) {
+                if (other instanceof PQContainer)
+                    return tuple.equals(((PQContainer)other).tuple);
+                else
+                    return false;
+            }
+
+            public int hashCode() {
+                return tuple.hashCode(); 
+            }
         }
 
         // We have to buffer a tuple because there's no easy way for next

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/TargetedTuple.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/TargetedTuple.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/TargetedTuple.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/TargetedTuple.java Tue Nov 24 19:54:19 2009
@@ -151,6 +151,15 @@
         return t.compareTo(o);
     }
     
+    @SuppressWarnings("unchecked")
+    public boolean equals(Object o) {
+        return t.equals(o);
+    }
+
+    public int hashCode() {
+        return t.hashCode();
+    }
+    
     /**
      * @return true if this Tuple is null
      */

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/PigContext.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/PigContext.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/PigContext.java Tue Nov 24 19:54:19 2009
@@ -185,15 +185,6 @@
 
         switch (execType) {
             case LOCAL:
-            {
-                lfs = new HDataStorage(URI.create("file:///"),
-                                       new Properties());
-                
-                dfs = lfs;
-                executionEngine = new LocalExecutionEngine(this);
-            }
-            break;
-
             case MAPREDUCE:
             {
                 executionEngine = new HExecutionEngine (this);
@@ -203,7 +194,7 @@
                 dfs = executionEngine.getDataStorage();
                 
                 lfs = new HDataStorage(URI.create("file:///"),
-                                        new Properties());                
+                                        new Properties()); 
             }
             break;
             
@@ -331,11 +322,7 @@
     }
 
     public DataStorage getFs() {
-        if(execType == ExecType.LOCAL) {
-            return lfs;
-        } else {
-            return dfs;
-        }
+        return dfs;
     }
     
     /**
@@ -573,10 +560,6 @@
 
         switch (execType) {
             case LOCAL:
-            {
-                executableManager = new ExecutableManager();
-            }
-            break;
             case MAPREDUCE: 
             {
                 executableManager = new HadoopExecutableManager();
@@ -628,9 +611,7 @@
      * @return error source
      */
     public byte getErrorSource() {
-        if(execType == ExecType.LOCAL) {
-            return PigException.USER_ENVIRONMENT;
-        } else if (execType == ExecType.MAPREDUCE) {
+        if(execType == ExecType.LOCAL || execType == ExecType.MAPREDUCE) {
             return PigException.REMOTE_ENVIRONMENT;
         } else {
             return PigException.BUG;