You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2009/11/12 19:33:18 UTC

svn commit: r835487 [2/3] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengin...

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java Thu Nov 12 18:33:15 2009
@@ -56,6 +56,11 @@
 
     @Override
     public Result getNext(Boolean bool) throws ExecException {
+        Result r = accumChild(null, dummyString);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result left, right;
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java Thu Nov 12 18:33:15 2009
@@ -105,6 +105,13 @@
     @Override
     public Result getNext(DataBag db) throws ExecException {
         Result input = processInputBag();
+        
+        // if this is called during accumulation, it is ok to have an empty bag
+        // we need to send STATUS_OK so that the UDF can be called.
+        if (isAccumulative()) {
+            reset();
+        }
+        
         if(input.returnStatus!=POStatus.STATUS_OK) {
             if(input.returnStatus == POStatus.STATUS_EOP && sendEmptyBagOnEOP)  {
                 // we received an EOP from the predecessor

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java Thu Nov 12 18:33:15 2009
@@ -43,108 +43,108 @@
 @SuppressWarnings("unchecked")
 public class POUserComparisonFunc extends ExpressionOperator {
 
-	/**
+    /**
      * 
      */
     private static final long serialVersionUID = 1L;
     FuncSpec funcSpec;
     Tuple t1, t2;
     transient ComparisonFunc func;
-	transient private Log log = LogFactory.getLog(getClass());
-	
-	public POUserComparisonFunc(OperatorKey k, int rp, List inp, FuncSpec funcSpec, ComparisonFunc func) {
+    transient private Log log = LogFactory.getLog(getClass());
+    
+    public POUserComparisonFunc(OperatorKey k, int rp, List inp, FuncSpec funcSpec, ComparisonFunc func) {
         super(k, rp);
         super.setInputs(inp);
         this.funcSpec = funcSpec;
-		this.func = func;
+        this.func = func;
         if(func==null)
             instantiateFunc();
-	}
-	
-	public POUserComparisonFunc(OperatorKey k, int rp, List inp, FuncSpec funcSpec) {
-		this(k, rp, inp, funcSpec, null);
-	}
-	
-	private void instantiateFunc() {
-		this.func = (ComparisonFunc) PigContext.instantiateFuncFromSpec(this.funcSpec);
+    }
+    
+    public POUserComparisonFunc(OperatorKey k, int rp, List inp, FuncSpec funcSpec) {
+        this(k, rp, inp, funcSpec, null);
+    }
+    
+    private void instantiateFunc() {
+        this.func = (ComparisonFunc) PigContext.instantiateFuncFromSpec(this.funcSpec);
         this.func.setReporter(reporter);
-	}
-	
-	public ComparisonFunc getComparator() {
-		return func;
-	}
-	
-	@Override
-	public Result getNext(Integer i) throws ExecException {
-		Result result = new Result();
-
-		result.result = func.compare(t1, t2);
-		result.returnStatus = (t1 != null && t2 != null) ? POStatus.STATUS_OK
-				: POStatus.STATUS_ERR;
-		// the two attached tuples are used up now. So we set the
-		// inputAttached flag to false
-		inputAttached = false;
-		return result;
-
-	}
-	
-	private Result getNext() {
-		Result res = null;
-		log.error("getNext being called with non-integer");
-		return res;
-	}
-	
-	@Override
-	public Result getNext(Boolean b) throws ExecException {
-		return getNext();
-	}
-
-	@Override
-	public Result getNext(DataBag db) throws ExecException {
-		return getNext();
-	}
-
-	@Override
-	public Result getNext(DataByteArray ba) throws ExecException {
-		return getNext();
-	}
-
-	@Override
-	public Result getNext(Double d) throws ExecException {
-		return getNext();
-	}
-
-	@Override
-	public Result getNext(Float f) throws ExecException {
-		return getNext();
-	}
-
-	@Override
-	public Result getNext(Long l) throws ExecException {
-		return getNext();
-	}
-
-	@Override
-	public Result getNext(Map m) throws ExecException {
-		return getNext();
-	}
-
-	@Override
-	public Result getNext(String s) throws ExecException {
-		return getNext();
-	}
-
-	@Override
-	public Result getNext(Tuple in) throws ExecException {
-		return getNext();
-	}
-
-	public void attachInput(Tuple t1, Tuple t2) {
-		this.t1 = t1;
-		this.t2 = t2;
-		inputAttached = true;
+    }
+    
+    public ComparisonFunc getComparator() {
+        return func;
+    }
+    
+    @Override
+    public Result getNext(Integer i) throws ExecException {
+        Result result = new Result();
+
+        result.result = func.compare(t1, t2);
+        result.returnStatus = (t1 != null && t2 != null) ? POStatus.STATUS_OK
+                : POStatus.STATUS_ERR;
+        // the two attached tuples are used up now. So we set the
+        // inputAttached flag to false
+        inputAttached = false;
+        return result;
+
+    }
+    
+    private Result getNext() {
+        Result res = null;
+        log.error("getNext being called with non-integer");
+        return res;
+    }
+    
+    @Override
+    public Result getNext(Boolean b) throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNext(DataBag db) throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNext(DataByteArray ba) throws ExecException {
+        return getNext();
+    }
 
-	}
+    @Override
+    public Result getNext(Double d) throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNext(Float f) throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNext(Long l) throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNext(Map m) throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNext(String s) throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNext(Tuple in) throws ExecException {
+        return getNext();
+    }
+
+    public void attachInput(Tuple t1, Tuple t2) {
+        this.t1 = t1;
+        this.t2 = t2;
+        inputAttached = true;
+
+    }
     
     private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException{
         is.defaultReadObject();
@@ -184,4 +184,12 @@
         return clone;
     }
 
+    /**
+     * Get child expressions of this expression
+     */
+    @Override
+    public List<ExpressionOperator> getChildExpressions() {
+        return null;
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Thu Nov 12 18:33:15 2009
@@ -26,6 +26,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
@@ -51,35 +52,35 @@
 
 public class POUserFunc extends ExpressionOperator {
 
-	/**
+    /**
      * 
      */
     private static final long serialVersionUID = 1L;
     transient EvalFunc func;
-	
-	transient private final Log log = LogFactory.getLog(getClass());
-	FuncSpec funcSpec;
+    
+    transient private final Log log = LogFactory.getLog(getClass());
+    FuncSpec funcSpec;
     FuncSpec origFSpec;
-	public static final byte INITIAL = 0;
-	public static final byte INTERMEDIATE = 1;
-	public static final byte FINAL = 2;
-	private boolean initialized = false;
-
-	public POUserFunc(OperatorKey k, int rp, List<PhysicalOperator> inp) {
-		super(k, rp);
-		inputs = inp;
+    public static final byte INITIAL = 0;
+    public static final byte INTERMEDIATE = 1;
+    public static final byte FINAL = 2;
+    private boolean initialized = false;
 
-	}
+    public POUserFunc(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        super(k, rp);
+        inputs = inp;
 
-	public POUserFunc(
+    }
+
+    public POUserFunc(
             OperatorKey k,
             int rp,
             List<PhysicalOperator> inp,
             FuncSpec funcSpec) {
-		this(k, rp, inp, funcSpec, null);
-	}
-	
-	public POUserFunc(
+        this(k, rp, inp, funcSpec, null);
+    }
+    
+    public POUserFunc(
             OperatorKey k,
             int rp,
             List<PhysicalOperator> inp,
@@ -87,60 +88,60 @@
             EvalFunc func) {
         super(k, rp);
         super.setInputs(inp);
-		this.funcSpec = funcSpec;
+        this.funcSpec = funcSpec;
         this.origFSpec = funcSpec;
-		this.func = func;
+        this.func = func;
         instantiateFunc(funcSpec);
-	}
+    }
 
-	private void instantiateFunc(FuncSpec fSpec) {
-		this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
-		//the next couple of initializations do not work as intended for the following reasons
-		//the reporter and pigLogger are member variables of PhysicalOperator
-		//when instanitateFunc is invoked at deserialization time, both
-		//reporter and pigLogger are null. They are set during map and reduce calls,
-		//making the initializations here basically useless. Look at the processInput
-		//method where these variables are re-initialized. At that point, the PhysicalOperator
-		//is set up correctly with the reporter and pigLogger references
+    private void instantiateFunc(FuncSpec fSpec) {
+        this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
+        //the next couple of initializations do not work as intended for the following reasons
+        //the reporter and pigLogger are member variables of PhysicalOperator
+        //when instanitateFunc is invoked at deserialization time, both
+        //reporter and pigLogger are null. They are set during map and reduce calls,
+        //making the initializations here basically useless. Look at the processInput
+        //method where these variables are re-initialized. At that point, the PhysicalOperator
+        //is set up correctly with the reporter and pigLogger references
         this.func.setReporter(reporter);
         this.func.setPigLogger(pigLogger);
-	}
-	
-	public Result processInput() throws ExecException {
+    }
+    
+    public Result processInput() throws ExecException {
 
         // Make sure the reporter is set, because it isn't getting carried
         // across in the serialization (don't know why).  I suspect it's as
         // cheap to call the setReporter call everytime as to check whether I
         // have (hopefully java will inline it).
         if(!initialized) {
-        	func.setReporter(reporter);
-        	func.setPigLogger(pigLogger);
-        	initialized = true;
+            func.setReporter(reporter);
+            func.setPigLogger(pigLogger);
+            initialized = true;
         }
 
-		Result res = new Result();
-		Tuple inpValue = null;
-		if (input == null && (inputs == null || inputs.size()==0)) {
+        Result res = new Result();
+        Tuple inpValue = null;
+        if (input == null && (inputs == null || inputs.size()==0)) {
 //			log.warn("No inputs found. Signaling End of Processing.");
-			res.returnStatus = POStatus.STATUS_EOP;
-			return res;
-		}
-
-		//Should be removed once the model is clear
-		if(reporter!=null) reporter.progress();
-
-		
-		if(isInputAttached()) {
-			res.result = input;
-			res.returnStatus = POStatus.STATUS_OK;
-			detachInput();
-			return res;
-		} else {
-			res.result = TupleFactory.getInstance().newTuple();
-			
-			Result temp = null;
-			for(PhysicalOperator op : inputs) {
-				switch(op.getResultType()){
+            res.returnStatus = POStatus.STATUS_EOP;
+            return res;
+        }
+
+        //Should be removed once the model is clear
+        if(reporter!=null) reporter.progress();
+
+        
+        if(isInputAttached()) {
+            res.result = input;
+            res.returnStatus = POStatus.STATUS_OK;
+            detachInput();
+            return res;
+        } else {
+            res.result = TupleFactory.getInstance().newTuple();
+            
+            Result temp = null;
+            for(PhysicalOperator op : inputs) {
+                switch(op.getResultType()){
                 case DataType.BAG:
                     temp = op.getNext(dummyBag);
                     break;
@@ -187,210 +188,222 @@
                     }
                 }
                 ((Tuple)res.result).append(temp.result);
-			}
-			res.returnStatus = temp.returnStatus;
-			return res;
-		}
-	}
+            }
+            res.returnStatus = temp.returnStatus;
+            return res;
+        }
+    }
 
-	private Result getNext() throws ExecException {
-        Result result = processInput();
+    private Result getNext() throws ExecException {
+        Result result = processInput();        
         String errMsg = "";
-		try {
-			if(result.returnStatus == POStatus.STATUS_OK) {
-				result.result = func.exec((Tuple) result.result);
+        try {						
+            if(result.returnStatus == POStatus.STATUS_OK) {
+                if (isAccumulative()) {
+                    if (isAccumStarted()) {							
+                        ((Accumulator)func).accumulate((Tuple)result.result);
+                        result.returnStatus = POStatus.STATUS_BATCH_OK;
+                        result.result = null;
+                    }else{												
+                        result.result = ((Accumulator)func).getValue();	
+                        ((Accumulator)func).cleanup();
+                    }
+                } else {					
+                    result.result = func.exec((Tuple) result.result);
+                }
                 if(resultType == DataType.BYTEARRAY) {
                     if(res.result != null && DataType.findType(result.result) != DataType.BYTEARRAY) {
                         result.result = new DataByteArray(result.result.toString().getBytes());
                     }
                 }
-				return result;
-			}
-			return result;
-		} catch (ExecException ee) {
-		    throw ee;
-		} catch (IOException ioe) {
-		    int errCode = 2078;
-		    String msg = "Caught error from UDF: " + funcSpec.getClassName(); 
+                return result;
+            }
+                        
+            return result;
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (IOException ioe) {
+            int errCode = 2078;
+            String msg = "Caught error from UDF: " + funcSpec.getClassName(); 
             String footer = " [" + ioe.getMessage() + "]";
-		    
-		    if(ioe instanceof PigException) {
-		        int udfErrorCode = ((PigException)ioe).getErrorCode();
-		        if(udfErrorCode != 0) {
-		            errCode = udfErrorCode;
-		            msg = ((PigException)ioe).getMessage();
-		        } else {
-		            msg += " [" + ((PigException)ioe).getMessage() + " ]";
-		        }
-		    } else {
-		        msg += footer;
-		    }
-		    
-			throw new ExecException(msg, errCode, PigException.BUG, ioe);
-		} catch (IndexOutOfBoundsException ie) {
+            
+            if(ioe instanceof PigException) {
+                int udfErrorCode = ((PigException)ioe).getErrorCode();
+                if(udfErrorCode != 0) {
+                    errCode = udfErrorCode;
+                    msg = ((PigException)ioe).getMessage();
+                } else {
+                    msg += " [" + ((PigException)ioe).getMessage() + " ]";
+                }
+            } else {
+                msg += footer;
+            }
+            
+            throw new ExecException(msg, errCode, PigException.BUG, ioe);
+        } catch (IndexOutOfBoundsException ie) {
             int errCode = 2078;
             String msg = "Caught error from UDF: " + funcSpec.getClassName() + 
             ", Out of bounds access [" + ie.getMessage() + "]";
             throw new ExecException(msg, errCode, PigException.BUG, ie);
-    	}
-	}
+        }
+    }
 
-	@Override
-	public Result getNext(Tuple tIn) throws ExecException {
-		return getNext();
-	}
+    @Override
+    public Result getNext(Tuple tIn) throws ExecException {
+        return getNext();
+    }
 
-	@Override
-	public Result getNext(DataBag db) throws ExecException {
-		return getNext();
-	}
+    @Override
+    public Result getNext(DataBag db) throws ExecException {
+        return getNext();
+    }
 
-	@Override
-	public Result getNext(Integer i) throws ExecException {
-		return getNext();
-	}
+    @Override
+    public Result getNext(Integer i) throws ExecException {
+        return getNext();
+    }
 
-	@Override
-	public Result getNext(Boolean b) throws ExecException {
+    @Override
+    public Result getNext(Boolean b) throws ExecException {
 
-		return getNext();
-	}
+        return getNext();
+    }
 
-	@Override
-	public Result getNext(DataByteArray ba) throws ExecException {
+    @Override
+    public Result getNext(DataByteArray ba) throws ExecException {
 
-		return getNext();
-	}
+        return getNext();
+    }
 
-	@Override
-	public Result getNext(Double d) throws ExecException {
+    @Override
+    public Result getNext(Double d) throws ExecException {
 
-		return getNext();
-	}
+        return getNext();
+    }
 
-	@Override
-	public Result getNext(Float f) throws ExecException {
+    @Override
+    public Result getNext(Float f) throws ExecException {
 
-		return getNext();
-	}
+        return getNext();
+    }
 
-	@Override
-	public Result getNext(Long l) throws ExecException {
+    @Override
+    public Result getNext(Long l) throws ExecException {
 
-		return getNext();
-	}
+        return getNext();
+    }
 
-	@Override
-	public Result getNext(Map m) throws ExecException {
+    @Override
+    public Result getNext(Map m) throws ExecException {
 
-		return getNext();
-	}
+        return getNext();
+    }
 
-	@Override
-	public Result getNext(String s) throws ExecException {
+    @Override
+    public Result getNext(String s) throws ExecException {
 
-		return getNext();
-	}
+        return getNext();
+    }
 
-	public void setAlgebraicFunction(byte Function) throws ExecException {
-		// This will only be used by the optimizer for putting correct functions
-		// in the mapper,
-		// combiner and reduce. This helps in maintaining the physical plan as
-		// is without the
-		// optimiser having to replace any operators.
-		// You wouldn't be able to make two calls to this function on the same
-		// algebraic EvalFunc as
-		// func is being changed.
-		switch (Function) {
-		case INITIAL:
+    public void setAlgebraicFunction(byte Function) throws ExecException {
+        // This will only be used by the optimizer for putting correct functions
+        // in the mapper,
+        // combiner and reduce. This helps in maintaining the physical plan as
+        // is without the
+        // optimiser having to replace any operators.
+        // You wouldn't be able to make two calls to this function on the same
+        // algebraic EvalFunc as
+        // func is being changed.
+        switch (Function) {
+        case INITIAL:
             funcSpec = new FuncSpec(getInitial());
-			break;
-		case INTERMEDIATE:
+            break;
+        case INTERMEDIATE:
             funcSpec = new FuncSpec(getIntermed());
-			break;
-		case FINAL:
+            break;
+        case FINAL:
             funcSpec = new FuncSpec(getFinal());
-			break;
+            break;
 
-		}
+        }
         instantiateFunc(funcSpec);
         setResultType(DataType.findType(((EvalFunc<?>) func).getReturnType()));
-	}
+    }
 
-	public String getInitial() throws ExecException {
-	    instantiateFunc(origFSpec);
-		if (func instanceof Algebraic) {
-			return ((Algebraic) func).getInitial();
-		} else {
-		    int errCode = 2072;
-			String msg = "Attempt to run a non-algebraic function"
+    public String getInitial() throws ExecException {
+        instantiateFunc(origFSpec);
+        if (func instanceof Algebraic) {
+            return ((Algebraic) func).getInitial();
+        } else {
+            int errCode = 2072;
+            String msg = "Attempt to run a non-algebraic function"
                 + " as an algebraic function";
             throw new ExecException(msg, errCode, PigException.BUG);
-		}
-	}
+        }
+    }
 
-	public String getIntermed() throws ExecException {
+    public String getIntermed() throws ExecException {
         instantiateFunc(origFSpec);
-		if (func instanceof Algebraic) {
-			return ((Algebraic) func).getIntermed();
-		} else {
+        if (func instanceof Algebraic) {
+            return ((Algebraic) func).getIntermed();
+        } else {
             int errCode = 2072;
             String msg = "Attempt to run a non-algebraic function"
                 + " as an algebraic function";
             throw new ExecException(msg, errCode, PigException.BUG);
-		}
-	}
+        }
+    }
 
-	public String getFinal() throws ExecException {
+    public String getFinal() throws ExecException {
         instantiateFunc(origFSpec);
-		if (func instanceof Algebraic) {
-			return ((Algebraic) func).getFinal();
-		} else {
+        if (func instanceof Algebraic) {
+            return ((Algebraic) func).getFinal();
+        } else {
             int errCode = 2072;
             String msg = "Attempt to run a non-algebraic function"
                 + " as an algebraic function";
             throw new ExecException(msg, errCode, PigException.BUG);
-		}
-	}
+        }
+    }
 
-	public Type getReturnType() {
-		return func.getReturnType();
-	}
+    public Type getReturnType() {
+        return func.getReturnType();
+    }
 
-	public void finish() {
-		func.finish();
-	}
+    public void finish() {
+        func.finish();
+    }
 
-	public Schema outputSchema(Schema input) {
-		return func.outputSchema(input);
-	}
+    public Schema outputSchema(Schema input) {
+        return func.outputSchema(input);
+    }
 
-	public Boolean isAsynchronous() {
-		return func.isAsynchronous();
-	}
+    public Boolean isAsynchronous() {
+        return func.isAsynchronous();
+    }
 
-	@Override
-	public String name() {
-	    return "POUserFunc" + "(" + func.getClass().getName() + ")" + "[" + DataType.findTypeName(resultType) + "]" + " - " + mKey.toString();
-	}
+    @Override
+    public String name() {
+        return "POUserFunc" + "(" + func.getClass().getName() + ")" + "[" + DataType.findTypeName(resultType) + "]" + " - " + mKey.toString();
+    }
 
-	@Override
-	public boolean supportsMultipleInputs() {
+    @Override
+    public boolean supportsMultipleInputs() {
 
-		return true;
-	}
+        return true;
+    }
 
-	@Override
-	public boolean supportsMultipleOutputs() {
+    @Override
+    public boolean supportsMultipleOutputs() {
 
-		return false;
-	}
+        return false;
+    }
 
-	@Override
-	public void visit(PhyPlanVisitor v) throws VisitorException {
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
 
-		v.visitUserFunc(this);
-	}
+        v.visitUserFunc(this);
+    }
 
     public FuncSpec getFuncSpec() {
         return funcSpec;
@@ -414,4 +427,12 @@
         is.defaultReadObject();
         instantiateFunc(funcSpec);
     }
+
+    /**
+     * Get child expression of this expression
+     */
+    @Override
+    public List<ExpressionOperator> getChildExpressions() {		
+        return null;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java Thu Nov 12 18:33:15 2009
@@ -53,6 +53,11 @@
 
     @Override
     public Result getNext(Double d) throws ExecException {
+        Result r = accumChild(null, d);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Double left = null, right = null;
@@ -76,6 +81,11 @@
     
     @Override
     public Result getNext(Float f) throws ExecException {
+        Result r = accumChild(null, f);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Float left = null, right = null;
@@ -99,6 +109,11 @@
     
     @Override
     public Result getNext(Integer i) throws ExecException {
+        Result r = accumChild(null, i);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Integer left = null, right = null;
@@ -122,6 +137,11 @@
     
     @Override
     public Result getNext(Long l) throws ExecException {
+        Result r = accumChild(null, l);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Long left = null, right = null;

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryExpressionOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryExpressionOperator.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryExpressionOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryExpressionOperator.java Thu Nov 12 18:33:15 2009
@@ -17,6 +17,9 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -26,6 +29,7 @@
 public abstract class UnaryExpressionOperator extends ExpressionOperator {
 
     ExpressionOperator expr;
+    private transient List<ExpressionOperator> child;
     
     public UnaryExpressionOperator(OperatorKey k, int rp) {
         super(k, rp);
@@ -73,4 +77,17 @@
         resultType = op.getResultType();
     }
 
+    /**
+     * Get child expression of this expression
+     */
+    @Override
+    public List<ExpressionOperator> getChildExpressions() {
+        if (child == null) {
+            child = new ArrayList<ExpressionOperator>();		
+            child.add(expr);		
+        }
+        
+        return child;		
+    }
+
 }

Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/AccumulativeTupleBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/AccumulativeTupleBuffer.java?rev=835487&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/AccumulativeTupleBuffer.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/AccumulativeTupleBuffer.java Thu Nov 12 18:33:15 2009
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.data.Tuple;
+
+/**
+ * This interface is used during Reduce phrase to process tuples
+ * in batch mode. It is used by POPackage when all of the UDFs can be
+ * called in accumulative mode. Tuples are not pulled all at once,
+ * instead, each time, only a specified number of tuples are pulled out
+ * of iterator and put in an buffer. Then this buffer is wrapped into 
+ * a bag to be passed to the operators in reduce plan.
+ * 
+ * The purpose of doing this is to reduce memory usage and avoid spilling.
+ */
+public interface AccumulativeTupleBuffer {
+    
+    /**
+     * Pull next batch of tuples from iterator and put them into this buffer
+     */
+    public void nextBatch() throws IOException;
+    
+    /**
+     * Whether there are more tuples to pull out of iterator
+     */
+    public boolean hasNextBatch() ;
+    
+    /**
+     * Clear internal buffer, this should be called after all data are retreived
+     */
+    public void clear();
+    
+    /**
+     * Get iterator of tuples in the buffer
+     * @param index  the index of tuple
+     */
+    public Iterator<Tuple> getTuples(int index);
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Thu Nov 12 18:33:15 2009
@@ -29,6 +29,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.AccumulativeBag;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
@@ -37,6 +38,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
@@ -89,6 +91,8 @@
 
     protected PhysicalOperator[] planLeafOps = null;
     
+    protected transient AccumulativeTupleBuffer buffer;
+    
     public POForEach(OperatorKey k) {
         this(k,-1,null,null);
     }
@@ -146,15 +150,54 @@
     @Override
     public boolean supportsMultipleOutputs() {
         return false;
+    }      
+          
+    public void setAccumulative() {
+        super.setAccumulative();
+        for(PhysicalPlan p : inputPlans) {            
+            Iterator<PhysicalOperator> iter = p.iterator();
+            while(iter.hasNext()) {
+                PhysicalOperator po = iter.next();
+                if (po instanceof ExpressionOperator || po instanceof PODistinct) {
+                    po.setAccumulative();
+                }
+            }
+        }
+    }
+
+    public void setAccumStart() {
+        super.setAccumStart();
+        for(PhysicalPlan p : inputPlans) {            
+            Iterator<PhysicalOperator> iter = p.iterator();
+            while(iter.hasNext()) {
+                PhysicalOperator po = iter.next();
+                if (po instanceof ExpressionOperator || po instanceof PODistinct) {
+                    po.setAccumStart();
+                }
+            }
+        }
     }
     
+    public void setAccumEnd() {    	
+        super.setAccumEnd();
+        for(PhysicalPlan p : inputPlans) {            
+            Iterator<PhysicalOperator> iter = p.iterator();
+            while(iter.hasNext()) {
+                PhysicalOperator po = iter.next();
+                if (po instanceof ExpressionOperator || po instanceof PODistinct) {    				
+                    po.setAccumEnd();
+                }
+            }
+        }
+    }
+
     /**
      * Calls getNext on the generate operator inside the nested
      * physical plan and returns it maintaining an additional state
      * to denote the begin and end of the nested plan processing.
      */
     @Override
-    public Result getNext(Tuple t) throws ExecException {
+    public Result getNext(Tuple t) throws ExecException {    	
         Result res = null;
         Result inp = null;
         //The nested plan is under processing
@@ -162,14 +205,15 @@
         //returns
         if(processingPlan){
             while(true) {
-                res = processPlan();
+                res = processPlan();               
+                
                 if(res.returnStatus==POStatus.STATUS_OK) {
                     if(lineageTracer !=  null && res.result != null) {
-                	ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
-                	tOut.synthetic = tIn.synthetic;
-                	lineageTracer.insert(tOut);
-                	lineageTracer.union(tOut, tIn);
-                	res.result = tOut;
+                    ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
+                    tOut.synthetic = tIn.synthetic;
+                    lineageTracer.insert(tOut);
+                    lineageTracer.union(tOut, tIn);
+                    res.result = tOut;
                     }
                     return res;
                 }
@@ -198,32 +242,71 @@
             if (inp.returnStatus == POStatus.STATUS_NULL) {
                 continue;
             }
-            
+                       
             attachInputToPlans((Tuple) inp.result);
+            Tuple tuple = (Tuple)inp.result;
+            
             for (PhysicalOperator po : opsToBeReset) {
                 po.reset();
             }
-            res = processPlan();
+            
+            if (isAccumulative()) {            	
+                for(int i=0; i<tuple.size(); i++) {            		
+                    if (tuple.getType(i) == DataType.BAG) {
+                        // we only need to check one bag, because all the bags
+                        // share the same buffer
+                        buffer = ((AccumulativeBag)tuple.get(i)).getTuplebuffer();
+                        break;
+                    }
+                }
+                
+                while(true) {                    		
+                    if (buffer.hasNextBatch()) {        
+                        try {
+                            buffer.nextBatch();
+                        }catch(IOException e) {
+                            throw new ExecException(e);
+                        }
+                        
+                        setAccumStart();                		
+                    }else{                
+                        buffer.clear();
+                        setAccumEnd();                		
+                    }
+                    
+                    res = processPlan();            	
+                    
+                    if (res.returnStatus == POStatus.STATUS_BATCH_OK) {
+                        // attach same input again to process next batch
+                        attachInputToPlans((Tuple) inp.result);
+                    } else {
+                        break;
+                    }
+                } 
+                
+            } else {                        
+                res = processPlan();          
+            }
             
             processingPlan = true;
 
             if(lineageTracer != null && res.result != null) {
-        	//we check for res.result since that can also be null in the case of flatten
-        	tIn = (ExampleTuple) inp.result;
-        	ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
-        	tOut.synthetic = tIn.synthetic;
-        	lineageTracer.insert(tOut);
-        	lineageTracer.union(tOut, tIn);
-        	res.result = tOut;
+            //we check for res.result since that can also be null in the case of flatten
+            tIn = (ExampleTuple) inp.result;
+            ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
+            tOut.synthetic = tIn.synthetic;
+            lineageTracer.insert(tOut);
+            lineageTracer.union(tOut, tIn);
+            res.result = tOut;
             }
             
             return res;
         }
     }
 
-    protected Result processPlan() throws ExecException{
+    protected Result processPlan() throws ExecException{    	
         Result res = new Result();
-        
+
         //We check if all the databags have exhausted the tuples. If so we enforce the reading of new data by setting data and its to null
         if(its != null) {
             boolean restartIts = true;
@@ -238,6 +321,7 @@
             }
         }
         
+ 
         if(its == null) {
             //getNext being called for the first time OR starting with a set of new data from inputs 
             its = new Iterator[noItems];
@@ -287,9 +371,13 @@
                 }
                 
                 }
-                
+
+                if (inputData.returnStatus == POStatus.STATUS_BATCH_OK) {                	
+                    continue;
+                }
+
                 if(inputData.returnStatus == POStatus.STATUS_EOP) {
-                    //we are done with all the elements. Time to return.
+                    //we are done with all the elements. Time to return.                	
                     its = null;
                     bags = null;
                     return inputData;
@@ -310,6 +398,11 @@
             }
         }
 
+        // if accumulating, we haven't got data yet for some fields, just return
+        if (isAccumulative() && isAccumStarted()) {
+            res.returnStatus = POStatus.STATUS_BATCH_OK;        	
+            return res;
+        }
         
         while(true) {
             if(data == null) {
@@ -396,8 +489,8 @@
 
     
     protected void attachInputToPlans(Tuple t) {
-        //super.attachInput(t);
-        for(PhysicalPlan p : inputPlans) {
+        //super.attachInput(t);    	
+        for(PhysicalPlan p : inputPlans) {        	
             p.attachInput(t);
         }
     }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Thu Nov 12 18:33:15 2009
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -25,6 +27,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.AccumulativeBag;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.DataBag;
@@ -199,7 +202,7 @@
     public void setInner(boolean[] inner) {
         this.inner = inner;
     }
-
+   
     /**
      * From the inputs, constructs the output tuple
      * for this co-group in the required format which
@@ -218,38 +221,51 @@
             DataBag[] dbs = null;
             dbs = new DataBag[numInputs];
                  
-            String bagType = null;
-            if (PigMapReduce.sJobConf != null) {
-       			bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");       			
-       	    }
-            
-        	for (int i = 0; i < numInputs; i++) {        		          	           		
-        	    if (bagType != null && bagType.equalsIgnoreCase("default")) {        	    	
-           			dbs[i] = mBagFactory.newDefaultBag();           			
-           	    } else {
-        	    	dbs[i] = new InternalCachedBag(numInputs);
-        	    }
-        	}      
-                           
-            //For each indexed tup in the inp, sort them
-            //into their corresponding bags based
-            //on the index
-            while (tupIter.hasNext()) {
-                NullableTuple ntup = tupIter.next();
-                int index = ntup.getIndex();
-                Tuple copy = getValueTuple(ntup, index);  
+            if (isAccumulative()) {
+                // create bag wrapper to pull tuples in many batches
+                // all bags have reference to the sample tuples buffer
+                // which contains tuples from one batch
+                POPackageTupleBuffer buffer = new POPackageTupleBuffer();
+                for (int i = 0; i < numInputs; i++) {
+                    dbs[i] = new AccumulativeBag(buffer, i);
+                }
                 
-                if (numInputs == 1) {
+            } else {
+                // create bag to pull all tuples out of iterator
+                String bagType = null;
+                if (PigMapReduce.sJobConf != null) {
+                       bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");       			
+                   }
+                                
+
+                for (int i = 0; i < numInputs; i++) {        		          	           		
+                    if (bagType != null && bagType.equalsIgnoreCase("default")) {        	    	
+                           dbs[i] = mBagFactory.newDefaultBag();           			
+                       } else {
+                        dbs[i] = new InternalCachedBag(numInputs);
+                    }
+                }      
+                               
+                //For each indexed tup in the inp, sort them
+                //into their corresponding bags based
+                //on the index
+                while (tupIter.hasNext()) {
+                    NullableTuple ntup = tupIter.next();
+                    int index = ntup.getIndex();
+                    Tuple copy = getValueTuple(ntup, index);  
                     
-                    // this is for multi-query merge where 
-                    // the numInputs is always 1, but the index
-                    // (the position of the inner plan in the 
-                    // enclosed operator) may not be 1.
-                    dbs[0].add(copy);
-                } else {
-                    dbs[index].add(copy);
+                    if (numInputs == 1) {
+                        
+                        // this is for multi-query merge where 
+                        // the numInputs is always 1, but the index
+                        // (the position of the inner plan in the 
+                        // enclosed operator) may not be 1.
+                        dbs[0].add(copy);
+                    } else {
+                        dbs[index].add(copy);
+                    }
+                    if(reporter!=null) reporter.progress();
                 }
-                if(reporter!=null) reporter.progress();
             }
                       
             //Construct the output tuple by appending
@@ -259,14 +275,16 @@
             res.set(0,key);
             int i=-1;
             for (DataBag bag : dbs) {
-                if(inner[++i]){
+                i++;
+                if(inner[i] && !isAccumulative()){
                     if(bag.size()==0){
                         detachInput();
                         Result r = new Result();
                         r.returnStatus = POStatus.STATUS_NULL;
                         return r;
                     }
-                }
+                } 
+                
                 res.set(i+1,bag);
             }
         }
@@ -420,4 +438,70 @@
         this.useSecondaryKey = useSecondaryKey;
     }
 
+    private class POPackageTupleBuffer implements AccumulativeTupleBuffer {
+        private List<Tuple>[] bags;
+        private Iterator<NullableTuple> iter;
+        private int batchSize;
+        private Object currKey;
+
+        @SuppressWarnings("unchecked")
+        public POPackageTupleBuffer() {    		
+            batchSize = 20000;
+            if (PigMapReduce.sJobConf != null) {
+                String size = PigMapReduce.sJobConf.get("pig.accumulative.batchsize");
+                if (size != null) {
+                    batchSize = Integer.parseInt(size);
+                }
+            }		
+            
+            this.bags = new List[numInputs];
+            for(int i=0; i<numInputs; i++) {
+                this.bags[i] = new ArrayList<Tuple>();
+            }
+            this.iter = tupIter;
+            this.currKey = key;
+        }
+        
+        @Override
+        public boolean hasNextBatch() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public void nextBatch() throws IOException {
+            for(int i=0; i<bags.length; i++) {
+                bags[i].clear();
+            }
+                        
+            key = currKey;			
+            for(int i=0; i<batchSize; i++) {
+                if (iter.hasNext()) {
+                     NullableTuple ntup = iter.next();
+                     int index = ntup.getIndex();
+                     Tuple copy = getValueTuple(ntup, index);		            
+                     if (numInputs == 1) {
+                            
+                            // this is for multi-query merge where 
+                            // the numInputs is always 1, but the index
+                            // (the position of the inner plan in the 
+                            // enclosed operator) may not be 1.
+                            bags[0].add(copy);
+                     } else {
+                            bags[index].add(copy);
+                     }
+                }
+            }
+        } 
+        
+        public void clear() {
+            for(int i=0; i<bags.length; i++) {
+                bags[i].clear();
+            }
+            iter = null;
+        }
+        
+        public Iterator<Tuple> getTuples(int index) {			
+            return bags[index].iterator();
+        }
+       };
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java Thu Nov 12 18:33:15 2009
@@ -22,6 +22,7 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
@@ -41,7 +42,7 @@
  * Generates the average of the values of the first field of a tuple. This class is Algebraic in
  * implemenation, so if possible the execution will be split into a local and global application
  */
-public class AVG extends EvalFunc<Double> implements Algebraic {
+public class AVG extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
     
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
@@ -178,6 +179,7 @@
         for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
             Tuple t = it.next();
             Double d = (Double)t.get(0);
+            
             // we count nulls in avg as contributing 0
             // a departure from SQL for performance of 
             // COUNT() which implemented by just inspecting
@@ -261,5 +263,53 @@
         funcList.add(new FuncSpec(IntAvg.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));
         funcList.add(new FuncSpec(LongAvg.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
         return funcList;
+    }
+
+    /* Accumulator interface implementation */
+    
+    private Double intermediateSum = null;
+    private Double intermediateCount = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double sum = sum(b);
+            if(sum == null) {
+                return;
+            }
+            // set default values
+            if (intermediateSum == null || intermediateCount == null) {
+                intermediateSum = 0.0;
+                intermediateCount = 0.0;
+            }
+            
+            double count = (Long)count(b);
+
+            if (count > 0) {
+                intermediateCount += count;
+                intermediateSum += sum;
+            }
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing average in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }        
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+        intermediateCount = null;
+    }
+
+    @Override
+    public Double getValue() {
+        Double avg = null;
+        if (intermediateCount > 0) {
+            avg = new Double(intermediateSum / intermediateCount);
+        }
+        return avg;
     }    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java Thu Nov 12 18:33:15 2009
@@ -21,6 +21,7 @@
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -36,7 +37,7 @@
  * Generates the count of the values of the first field of a tuple. This class is Algebraic in
  * implemenation, so if possible the execution will be split into a local and global functions
  */
-public class COUNT extends EvalFunc<Long> implements Algebraic{
+public class COUNT extends EvalFunc<Long> implements Algebraic, Accumulator<Long>{
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
     @Override
@@ -136,5 +137,38 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.LONG)); 
     }
+    
+    /* Accumulator interface implementation */
+    private long intermediateCount = 0L;
+
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            DataBag bag = (DataBag)b.get(0);
+            Iterator it = bag.iterator();
+            while (it.hasNext()){
+                Tuple t = (Tuple)it.next();
+                if (t != null && t.size() > 0 && t.get(0) != null) {
+                    intermediateCount += 1;
+                }
+            }
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateCount = 0L;
+    }
+
+    @Override
+    public Long getValue() {
+        return intermediateCount;
+    }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java Thu Nov 12 18:33:15 2009
@@ -21,6 +21,7 @@
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -38,7 +39,7 @@
  * implements SQL COUNT(*) semantics. This class is Algebraic in
  * implemenation, so if possible the execution will be split into a local and global functions
  */
-public class COUNT_STAR extends EvalFunc<Long> implements Algebraic{
+public class COUNT_STAR extends EvalFunc<Long> implements Algebraic, Accumulator<Long>{
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
     @Override
@@ -127,4 +128,31 @@
         return new Schema(new Schema.FieldSchema(null, DataType.LONG)); 
     }
 
+    /* Accumulator interface imlpemenatation */
+    
+    private long intermediateCount = 0L;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            intermediateCount += sum(b);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateCount = 0L;
+    }
+
+    @Override
+    public Long getValue() {
+        return intermediateCount;
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java Thu Nov 12 18:33:15 2009
@@ -21,6 +21,7 @@
 import java.util.HashMap;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
@@ -38,7 +39,7 @@
  * Generates the average of the values of the first field of a tuple. This class is Algebraic in
  * implemenation, so if possible the execution will be split into a local and global application
  */
-public class DoubleAvg extends EvalFunc<Double> implements Algebraic {
+public class DoubleAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
     
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
@@ -233,4 +234,52 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
+    
+    /* Accumulator interface */
+    
+    private Double intermediateSum = null;
+    private Double intermediateCount = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double sum = sum(b);
+            if(sum == null) {
+                return;
+            }
+            // set default values
+            if (intermediateSum == null || intermediateCount == null) {
+                intermediateSum = 0.0;
+                intermediateCount = 0.0;
+            }
+            
+            double count = (Long)count(b);
+
+            if (count > 0) {
+                intermediateCount += count;
+                intermediateSum += sum;
+            }
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing average in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }        
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+        intermediateCount = null;
+    }
+
+    @Override
+    public Double getValue() {
+        Double avg = null;
+        if (intermediateCount > 0) {
+            avg = new Double(intermediateSum / intermediateCount);
+        }
+        return avg;
+    }    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -34,7 +35,7 @@
 /**
  * Generates the max of the values of the first field of a tuple.
  */
-public class DoubleMax extends EvalFunc<Double> implements Algebraic {
+public class DoubleMax extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
 
     @Override
     public Double exec(Tuple input) throws IOException {
@@ -154,5 +155,40 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
+
+    /* Accumulator interface */
+    
+    private Double intermediateMax = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double curMax = max(b);
+            if (curMax == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative infinity */
+            if (intermediateMax == null) {
+                intermediateMax = Double.NEGATIVE_INFINITY;
+            }
+            intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing max in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMax = null;
+    }
+
+    @Override
+    public Double getValue() {
+        return intermediateMax;
+    }
     
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
 /**
  * Generates the min of the Double values in the first field of a tuple.
  */
-public class DoubleMin extends EvalFunc<Double> implements Algebraic {
+public class DoubleMin extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
 
     @Override
     public Double exec(Tuple input) throws IOException {
@@ -152,4 +153,38 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
+    
+    /* Accumulator interface implementation */
+    private Double intermediateMin = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double curMin = min(b);
+            if (curMin == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative infinity */
+            if (intermediateMin == null) {
+                intermediateMin = Double.POSITIVE_INFINITY;
+            }
+            intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMin = null;
+    }
+
+    @Override
+    public Double getValue() {
+        return intermediateMin;
+    }    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java Thu Nov 12 18:33:15 2009
@@ -21,6 +21,7 @@
 import java.util.HashMap;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
@@ -38,7 +39,7 @@
 /**
  * Generates the sum of the values of the first field of a tuple.
  */
-public class DoubleSum extends EvalFunc<Double> implements Algebraic {
+public class DoubleSum extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
 
     @Override
     public Double exec(Tuple input) throws IOException {
@@ -160,4 +161,34 @@
         return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
     
+    /* Accumulator interface implementation*/
+    private Double intermediateSum = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double curSum = sum(b);
+            if (curSum == null) {
+                return;
+            }
+            intermediateSum = (intermediateSum == null ? 0.0 : intermediateSum) + curSum;
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing sum in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+    }
+
+    @Override
+    public Double getValue() {
+        return intermediateSum;
+    }    
+    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
  * Generates the average of the values of the first field of a tuple. This class is Algebraic in
  * implementation, so if possible the execution will be split into a local and global application
  */
-public class FloatAvg extends EvalFunc<Double> implements Algebraic {
+public class FloatAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
     
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
@@ -230,5 +231,53 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
+    
+    /* Accumulator interface */
+
+    private Double intermediateSum = null;
+    private Double intermediateCount = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double sum = sum(b);
+            if(sum == null) {
+                return;
+            }
+            // set default values
+            if (intermediateSum == null || intermediateCount == null) {
+                intermediateSum = 0.0;
+                intermediateCount = 0.0;
+            }
+            
+            double count = (Long)count(b);
+            
+            if (count > 0) {
+                intermediateCount += count;
+                intermediateSum += sum;
+            }
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing average in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }        
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+        intermediateCount = null;
+    }
+
+    @Override
+    public Double getValue() {
+        Double avg = null;
+        if (intermediateCount > 0) {
+            avg = new Double(intermediateSum / intermediateCount);
+        }
+        return avg;
+    }    
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
 /**
  * Generates the max of the values of the first field of a tuple.
  */
-public class FloatMax extends EvalFunc<Float> implements Algebraic {
+public class FloatMax extends EvalFunc<Float> implements Algebraic, Accumulator<Float> {
 
     @Override
     public Float exec(Tuple input) throws IOException {
@@ -152,4 +153,39 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.FLOAT)); 
     }
+    
+    /* Accumulator interface */
+    
+    private Float intermediateMax = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Float curMax = max(b);
+            if (curMax == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative infinity */
+            if (intermediateMax == null) {
+                intermediateMax = Float.NEGATIVE_INFINITY;
+            }
+            intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMax = null;
+    }
+
+    @Override
+    public Float getValue() {
+        return intermediateMax;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
 /**
  * Generates the min of the Float values in the first field of a tuple.
  */
-public class FloatMin extends EvalFunc<Float> implements Algebraic {
+public class FloatMin extends EvalFunc<Float> implements Algebraic, Accumulator<Float> {
 
     @Override
     public Float exec(Tuple input) throws IOException {
@@ -152,4 +153,38 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.FLOAT)); 
     }
+    
+    /* Accumulator interface implementation */
+    private Float intermediateMin = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Float curMin = min(b);
+            if (curMin == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative infinity */
+            if (intermediateMin == null) {
+                intermediateMin = Float.POSITIVE_INFINITY;
+            }
+            intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMin = null;
+    }
+
+    @Override
+    public Float getValue() {
+        return intermediateMin;
+    }    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
 /**
  * Generates the sum of the Float values in the first field of a tuple.
  */
-public class FloatSum extends EvalFunc<Double> implements Algebraic {
+public class FloatSum extends EvalFunc<Double> implements Algebraic, Accumulator<Double>{
 
     @Override
     public Double exec(Tuple input) throws IOException {
@@ -196,5 +197,35 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
+    
+    /* Accumulator interface implementation*/
+    private Double intermediateSum = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double curSum = sum(b);
+            if (curSum == null) {
+                return;
+            }
+            intermediateSum = (intermediateSum == null ? 0.0 : intermediateSum) + curSum;
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+    }
+
+    @Override
+    public Double getValue() {
+        return intermediateSum;
+    }    
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
  * Generates the average of the values of the first field of a tuple. This class is Algebraic in
  * implementation, so if possible the execution will be split into a local and global application
  */
-public class IntAvg extends EvalFunc<Double> implements Algebraic {
+public class IntAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
     
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
@@ -236,4 +237,51 @@
         return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
 
+    /* Accumulator interface */
+
+    private Long intermediateSum = null;
+    private Double intermediateCount = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Long sum = sum(b);
+            if(sum == null) {
+                return;
+            }
+            // set default values
+            if (intermediateSum == null || intermediateCount == null) {
+                intermediateSum = 0L;
+                intermediateCount = 0.0;
+            }
+            
+            double count = (Long)count(b);
+
+            if (count > 0) {
+                intermediateCount += count;
+                intermediateSum += sum;
+            }
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing average in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }        
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+        intermediateCount = null;
+    }
+
+    @Override
+    public Double getValue() {
+        Double avg = null;
+        if (intermediateCount > 0) {
+            avg = new Double(intermediateSum / intermediateCount);
+        }
+        return avg;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
 /**
  * Generates the max of the values of the first field of a tuple.
  */
-public class IntMax extends EvalFunc<Integer> implements Algebraic {
+public class IntMax extends EvalFunc<Integer> implements Algebraic, Accumulator<Integer> {
 
     @Override
     public Integer exec(Tuple input) throws IOException {
@@ -153,4 +154,39 @@
         return new Schema(new Schema.FieldSchema(null, DataType.INTEGER)); 
     }
     
+    /* Accumulator interface */
+    
+    private Integer intermediateMax = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Integer curMax = max(b);
+            if (curMax == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative infinity */
+            if (intermediateMax == null) {
+                intermediateMax = Integer.MIN_VALUE;
+            }
+            intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing max in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMax = null;
+    }
+
+    @Override
+    public Integer getValue() {
+        return intermediateMax;
+    }
+    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
 /**
  * Generates the min of the Integer values in the first field of a tuple.
  */
-public class IntMin extends EvalFunc<Integer> implements Algebraic {
+public class IntMin extends EvalFunc<Integer> implements Algebraic, Accumulator<Integer> {
 
     @Override
     public Integer exec(Tuple input) throws IOException {
@@ -154,4 +155,38 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.INTEGER)); 
     }
+    
+    /* Accumulator interface implementation */
+    private Integer intermediateMin = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Integer curMin = min(b);
+            if (curMin == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative infinity */
+            if (intermediateMin == null) {
+                intermediateMin = Integer.MAX_VALUE;
+            }
+            intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMin = null;
+    }
+
+    @Override
+    public Integer getValue() {
+        return intermediateMin;
+    }    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -34,7 +35,7 @@
 /**
  * Generates the sum of the Integer in the first field of a tuple.
  */
-public class IntSum extends EvalFunc<Long> implements Algebraic {
+public class IntSum extends EvalFunc<Long> implements Algebraic, Accumulator<Long> {
 
     @Override
     public Long exec(Tuple input) throws IOException {
@@ -198,4 +199,33 @@
         return new Schema(new Schema.FieldSchema(null, DataType.LONG)); 
     }
 
+    /* Accumulator interface implementation*/
+    private Long intermediateSum = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Long curSum = sum(b);
+            if (curSum == null) {
+                return;
+            }
+            intermediateSum = (intermediateSum == null ? 0L : intermediateSum) + curSum;
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+    }
+
+    @Override
+    public Long getValue() {
+        return intermediateSum;
+    }    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
  * Generates the average of the values of the first field of a tuple. This class is Algebraic in
  * implementation, so if possible the execution will be split into a local and global application
  */
-public class LongAvg extends EvalFunc<Double> implements Algebraic {
+public class LongAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
     
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
@@ -230,5 +231,53 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
+    
+    /* Accumulator interface */
+   
+    private Long intermediateSum = null;
+    private Double intermediateCount = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Long sum = sum(b);
+            if(sum == null) {
+                return;
+            }
+            // set default values
+            if (intermediateSum == null || intermediateCount == null) {
+                intermediateSum = 0L;
+                intermediateCount = 0.0;
+            }
+            
+            double count = (Long)count(b);
+
+            if (count > 0) {
+                intermediateCount += count;
+                intermediateSum += sum;
+            }
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing average in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }        
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+        intermediateCount = null;
+    }
+
+    @Override
+    public Double getValue() {
+        Double avg = null;
+        if (intermediateCount > 0) {
+            avg = new Double(intermediateSum / intermediateCount);
+        }
+        return avg;
+    }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
 /**
  * Generates the max of the values of the first field of a tuple.
  */
-public class LongMax extends EvalFunc<Long> implements Algebraic {
+public class LongMax extends EvalFunc<Long> implements Algebraic, Accumulator<Long> {
 
     @Override
     public Long exec(Tuple input) throws IOException {
@@ -152,4 +153,39 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.LONG)); 
     }
+    
+    /* Accumulator interface */
+    
+    private Long intermediateMax = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Long curMax = max(b);
+            if (curMax == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative infinity */
+            if (intermediateMax == null) {
+                intermediateMax = Long.MIN_VALUE;
+            }
+            intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);           
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMax = null;
+    }
+
+    @Override
+    public Long getValue() {
+        return intermediateMax;
+    }
 }