You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/18 18:46:13 UTC

svn commit: r1543094 [1/2] - in /pig/trunk: ./ src/org/apache/pig/builtin/ test/org/apache/pig/test/

Author: cheolsoo
Date: Mon Nov 18 17:46:13 2013
New Revision: 1543094

URL: http://svn.apache.org/r1543094
Log:
PIG-3580: MIN, MAX and AVG functions for BigDecimal and BigInteger (harichinnan via cheolsoo)

Added:
    pig/trunk/src/org/apache/pig/builtin/BigDecimalAvg.java
    pig/trunk/src/org/apache/pig/builtin/BigDecimalMax.java
    pig/trunk/src/org/apache/pig/builtin/BigDecimalMin.java
    pig/trunk/src/org/apache/pig/builtin/BigDecimalWrapper.java
    pig/trunk/src/org/apache/pig/builtin/BigIntegerAvg.java
    pig/trunk/src/org/apache/pig/builtin/BigIntegerMax.java
    pig/trunk/src/org/apache/pig/builtin/BigIntegerMin.java
    pig/trunk/src/org/apache/pig/builtin/BigIntegerWrapper.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/builtin/AVG.java
    pig/trunk/src/org/apache/pig/builtin/AlgebraicBigDecimalMathBase.java
    pig/trunk/src/org/apache/pig/builtin/AlgebraicBigIntegerMathBase.java
    pig/trunk/src/org/apache/pig/builtin/MAX.java
    pig/trunk/src/org/apache/pig/builtin/MIN.java
    pig/trunk/test/org/apache/pig/test/TestBuiltin.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1543094&r1=1543093&r2=1543094&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Nov 18 17:46:13 2013
@@ -28,6 +28,8 @@ PIG-3419: Pluggable Execution Engine (ac
 
 IMPROVEMENTS
 
+PIG-3580: MIN, MAX and AVG functions for BigDecimal and BigInteger (harichinnan via cheolsoo)
+
 PIG-3569: SUM function for BigDecimal and BigInteger (harichinnan via rohini)
 
 PIG-3505: Make AvroStorage sync interval take default from io.file.buffer.size (rohini)

Modified: pig/trunk/src/org/apache/pig/builtin/AVG.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/AVG.java?rev=1543094&r1=1543093&r2=1543094&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/AVG.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/AVG.java Mon Nov 18 17:46:13 2013
@@ -47,7 +47,7 @@ import org.apache.pig.backend.executione
  * tuples of one record each.  If Pig knows from the schema that this function
  * will be passed a bag of integers or longs, it will use a specially adapted version of
  * AVG that uses integer arithmetic for summing the data.  The return type
- * of AVG will always be double, regardless of the input type. 
+ * of AVG will always be double, regardless of the input type.
  * <p>
  * AVG implements the {@link org.apache.pig.Accumulator} interface as well.
  * While this will never be
@@ -55,7 +55,7 @@ import org.apache.pig.backend.executione
  * used for a given calculation
  */
 public class AVG extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
-    
+
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
     @Override
@@ -72,7 +72,7 @@ public class AVG extends EvalFunc<Double
             Double avg = null;
             if (count > 0)
                 avg = new Double(sum / count);
-    
+
             return avg;
         } catch (ExecException ee) {
             throw ee;
@@ -125,9 +125,9 @@ public class AVG extends EvalFunc<Double
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing average in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);            
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
-                
+
         }
     }
 
@@ -142,8 +142,8 @@ public class AVG extends EvalFunc<Double
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing average in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
-            
+                throw new ExecException(msg, errCode, PigException.BUG, e);
+
             }
         }
     }
@@ -171,7 +171,7 @@ public class AVG extends EvalFunc<Double
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing average in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);           
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
         }
     }
@@ -191,9 +191,9 @@ public class AVG extends EvalFunc<Double
         for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
             Tuple t = it.next();
             Double d = (Double)t.get(0);
-            
+
             // we count nulls in avg as contributing 0
-            // a departure from SQL for performance of 
+            // a departure from SQL for performance of
             // COUNT() which implemented by just inspecting
             // size of the bag
             if(d == null) {
@@ -216,19 +216,19 @@ public class AVG extends EvalFunc<Double
     static protected long count(Tuple input) throws ExecException {
         DataBag values = (DataBag)input.get(0);
         long cnt = 0;
-        Iterator it = values.iterator();
+        Iterator<Tuple> it = values.iterator();
         while (it.hasNext()){
-            Tuple t = (Tuple)it.next(); 
+            Tuple t = (Tuple)it.next();
             if (t != null && t.size() > 0 && t.get(0) != null)
                 cnt ++;
         }
-                    
+
         return cnt;
     }
 
     static protected Double sum(Tuple input) throws ExecException, IOException {
         DataBag values = (DataBag)input.get(0);
-        
+
         // if we were handed an empty bag, return NULL
         if(values.size() == 0) {
             return null;
@@ -260,7 +260,7 @@ public class AVG extends EvalFunc<Double
 
     @Override
     public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
+        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
     }
 
     /* (non-Javadoc)
@@ -274,14 +274,16 @@ public class AVG extends EvalFunc<Double
         funcList.add(new FuncSpec(FloatAvg.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.FLOAT)));
         funcList.add(new FuncSpec(IntAvg.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));
         funcList.add(new FuncSpec(LongAvg.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
+        funcList.add(new FuncSpec(BigDecimalAvg.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.BIGDECIMAL)));
+        funcList.add(new FuncSpec(BigIntegerAvg.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.BIGINTEGER)));
         return funcList;
     }
 
     /* Accumulator interface implementation */
-    
+
     private Double intermediateSum = null;
     private Double intermediateCount = null;
-    
+
     @Override
     public void accumulate(Tuple b) throws IOException {
         try {
@@ -294,7 +296,7 @@ public class AVG extends EvalFunc<Double
                 intermediateSum = 0.0;
                 intermediateCount = 0.0;
             }
-            
+
             double count = (Long)count(b);
 
             if (count > 0) {
@@ -306,9 +308,9 @@ public class AVG extends EvalFunc<Double
         } catch (Exception e) {
             int errCode = 2106;
             String msg = "Error while computing average in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);           
+            throw new ExecException(msg, errCode, PigException.BUG, e);
         }
-    }        
+    }
 
     @Override
     public void cleanup() {
@@ -323,5 +325,5 @@ public class AVG extends EvalFunc<Double
             avg = new Double(intermediateSum / intermediateCount);
         }
         return avg;
-    }    
+    }
 }

Modified: pig/trunk/src/org/apache/pig/builtin/AlgebraicBigDecimalMathBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/AlgebraicBigDecimalMathBase.java?rev=1543094&r1=1543093&r2=1543094&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/AlgebraicBigDecimalMathBase.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/AlgebraicBigDecimalMathBase.java Mon Nov 18 17:46:13 2013
@@ -20,7 +20,6 @@ package org.apache.pig.builtin;
 import java.io.IOException;
 import java.util.Iterator;
 import java.math.BigDecimal;
-import java.math.MathContext;
 
 import org.apache.pig.Accumulator;
 import org.apache.pig.PigException;
@@ -41,8 +40,9 @@ public abstract class AlgebraicBigDecima
 
     protected static BigDecimal getSeed(KNOWN_OP op) {
         switch (op) {
-        //TODO: Implement MAX and MIN functions
         case SUM: return BigDecimal.ZERO;
+        case MAX: return BigDecimalWrapper.NEGATIVE_INFINITY();
+        case MIN: return BigDecimalWrapper.POSITIVE_INFINITY();
         default: return null;
         }
     }
@@ -55,11 +55,28 @@ public abstract class AlgebraicBigDecima
         } else {
             BigDecimal retVal = null;
             switch (op) {
-            //TODO: Implement MAX and MIN functions
-            case SUM: 
+            case SUM:
                 retVal = arg1.add(arg2);
                 break;
-            default: 
+            case MAX:
+                if (BigDecimalWrapper.class.isInstance(arg1) && (((BigDecimalWrapper)arg1).isNegativeInfinity())) {
+                    retVal = arg2;
+                } else if (BigDecimalWrapper.class.isInstance(arg2) && (((BigDecimalWrapper)arg2).isNegativeInfinity())) {
+                    retVal = arg1;
+                } else {
+                    retVal = arg1.max(arg2);
+                }
+                break;
+            case MIN:
+                if (BigDecimalWrapper.class.isInstance(arg1) && (((BigDecimalWrapper)arg1).isPositiveInfinity())) {
+                    retVal = arg2;
+                } else if (BigDecimalWrapper.class.isInstance(arg2) && (((BigDecimalWrapper)arg2).isPositiveInfinity())) {
+                    retVal = arg1;
+                } else {
+                    retVal = arg1.min(arg2);
+                }
+                break;
+            default:
                 retVal = null;
                 break;
             }

Modified: pig/trunk/src/org/apache/pig/builtin/AlgebraicBigIntegerMathBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/AlgebraicBigIntegerMathBase.java?rev=1543094&r1=1543093&r2=1543094&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/AlgebraicBigIntegerMathBase.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/AlgebraicBigIntegerMathBase.java Mon Nov 18 17:46:13 2013
@@ -20,7 +20,6 @@ package org.apache.pig.builtin;
 import java.io.IOException;
 import java.util.Iterator;
 import java.math.BigInteger;
-import java.math.MathContext;
 
 import org.apache.pig.Accumulator;
 import org.apache.pig.PigException;
@@ -41,8 +40,9 @@ public abstract class AlgebraicBigIntege
 
     protected static BigInteger getSeed(KNOWN_OP op) {
         switch (op) {
-        //TODO: Implement MAX and MIN functions
         case SUM: return BigInteger.ZERO;
+        case MAX: return BigIntegerWrapper.NEGATIVE_INFINITY();
+        case MIN: return BigIntegerWrapper.POSITIVE_INFINITY();
         default: return null;
         }
     }
@@ -55,11 +55,28 @@ public abstract class AlgebraicBigIntege
         } else {
             BigInteger retVal = null;
             switch (op) {
-            //TODO: Implement MAX and MIN functions
-            case SUM: 
+            case SUM:
                 retVal = arg1.add(arg2);
                 break;
-            default: 
+            case MAX:
+                if (BigIntegerWrapper.class.isInstance(arg1) && (((BigIntegerWrapper)arg1).isNegativeInfinity())) {
+                    retVal = arg2;
+                } else if(BigIntegerWrapper.class.isInstance(arg2) && (((BigIntegerWrapper)arg2).isNegativeInfinity())) {
+                    retVal = arg1;
+                } else {
+                    retVal = arg1.max(arg2);
+                }
+                break;
+            case MIN:
+                if (BigIntegerWrapper.class.isInstance(arg1) && (((BigIntegerWrapper)arg1).isPositiveInfinity())) {
+                    retVal = arg2;
+                } else if (BigIntegerWrapper.class.isInstance(arg2) && (((BigIntegerWrapper)arg2).isPositiveInfinity())) {
+                    retVal = arg1;
+                } else{
+                    retVal = arg1.min(arg2);
+                }
+                break;
+            default:
                 retVal = null;
                 break;
             }
@@ -84,7 +101,7 @@ public abstract class AlgebraicBigIntege
                 BigInteger d = (BigInteger) n;
                 sawNonNull = true;
                 sofar = doWork(sofar, d, opProvider.getOp());
-            }catch(RuntimeException exp) {
+            } catch(RuntimeException exp) {
                 int errCode = 2103;
                 throw new ExecException("Problem doing work on BigInteger", errCode, PigException.BUG, exp);
             }
@@ -136,7 +153,7 @@ public abstract class AlgebraicBigIntege
 
     @Override
     public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.BIGDECIMAL));
+        return new Schema(new Schema.FieldSchema(null, DataType.BIGINTEGER));
     }
 
     /* Accumulator interface implementation*/

Added: pig/trunk/src/org/apache/pig/builtin/BigDecimalAvg.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/BigDecimalAvg.java?rev=1543094&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/BigDecimalAvg.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/BigDecimalAvg.java Mon Nov 18 17:46:13 2013
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.math.BigDecimal;
+import java.math.MathContext;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.backend.executionengine.ExecException;
+
+
+/**
+ * This method should never be used directly, use {@link AVG}.
+ */
+public class BigDecimalAvg extends EvalFunc<BigDecimal> implements Algebraic, Accumulator<BigDecimal> {
+
+    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+    @Override
+    public BigDecimal exec(Tuple input) throws IOException {
+        try {
+            BigDecimal sum = sum(input);
+            if (sum == null) {
+                // either we were handed an empty bag or a bag
+                // filled with nulls - return null in this case
+                return null;
+            }
+            BigDecimal count = count(input);
+
+            BigDecimal avg = null;
+            if (count.compareTo(BigDecimal.ZERO) > 0)
+                avg = div(sum, count);
+            return avg;
+        } catch (ExecException ee) {
+            throw ee;
+        }
+    }
+
+    public String getInitial() {
+        return Initial.class.getName();
+    }
+
+    public String getIntermed() {
+        return Intermediate.class.getName();
+    }
+
+    public String getFinal() {
+        return Final.class.getName();
+    }
+
+    static public class Initial extends EvalFunc<Tuple> {
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
+                Tuple t = mTupleFactory.newTuple(2);
+                // input is a bag with one tuple containing
+                // the column we are trying to avg on
+                DataBag bg = (DataBag) input.get(0);
+                BigDecimal d = null;
+                if (bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    d = (BigDecimal)(tp.get(0));
+                }
+                t.set(0, d);
+                if (d != null) {
+                    t.set(1, BigDecimal.ONE);
+                } else {
+                    t.set(1, BigDecimal.ZERO);
+                }
+                return t;
+            } catch (ExecException ee) {
+                throw ee;
+            } catch (Exception e) {
+                int errCode = 2106;
+                String msg = "Error while computing average in " + this.getClass().getSimpleName();
+                throw new ExecException(msg, errCode, PigException.BUG, e);
+            }
+        }
+    }
+
+    static public class Intermediate extends EvalFunc<Tuple> {
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
+                DataBag b = (DataBag)input.get(0);
+                return combine(b);
+            } catch (ExecException ee) {
+                throw ee;
+            } catch (Exception e) {
+                int errCode = 2106;
+                String msg = "Error while computing average in " + this.getClass().getSimpleName();
+                throw new ExecException(msg, errCode, PigException.BUG, e);
+            }
+        }
+    }
+
+    static public class Final extends EvalFunc<BigDecimal> {
+        @Override
+        public BigDecimal exec(Tuple input) throws IOException {
+            try {
+                DataBag b = (DataBag)input.get(0);
+                Tuple combined = combine(b);
+
+                BigDecimal sum = (BigDecimal)combined.get(0);
+                if (sum == null) {
+                    return null;
+                }
+                BigDecimal count = (BigDecimal)combined.get(1);
+
+                BigDecimal avg = null;
+                if (count.compareTo(BigDecimal.ZERO) > 0) {
+                    avg = div(sum,count);
+                }
+                return avg;
+            } catch (ExecException ee) {
+                throw ee;
+            } catch (Exception e) {
+                int errCode = 2106;
+                String msg = "Error while computing average in " + this.getClass().getSimpleName();
+                throw new ExecException(msg, errCode, PigException.BUG, e);
+            }
+        }
+    }
+
+    static protected Tuple combine(DataBag values) throws ExecException {
+        BigDecimal sum = BigDecimal.ZERO;
+        BigDecimal count = BigDecimal.ZERO;
+
+        // combine is called from Intermediate and Final
+        // In either case, Initial would have been called
+        // before and would have sent in valid tuples
+        // Hence we don't need to check if incoming bag
+        // is empty
+
+        Tuple output = mTupleFactory.newTuple(2);
+        boolean sawNonNull = false;
+        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+            Tuple t = it.next();
+            BigDecimal d = (BigDecimal)t.get(0);
+            // we count nulls in avg as contributing 0
+            // a departure from SQL for performance of
+            // COUNT() which implemented by just inspecting
+            // size of the bag
+            if (d == null) {
+                d = BigDecimal.ZERO;
+            } else {
+                sawNonNull = true;
+            }
+            sum = sum.add(d);
+            count = count.add((BigDecimal)t.get(1));
+        }
+        if (sawNonNull) {
+            output.set(0, sum);
+        } else {
+            output.set(0, null);
+        }
+        output.set(1, count);
+        return output;
+    }
+
+    static protected BigDecimal count(Tuple input) throws ExecException {
+        DataBag values = (DataBag)input.get(0);
+        Iterator<Tuple> it = values.iterator();
+        BigDecimal cnt = BigDecimal.ZERO;
+        while (it.hasNext()){
+            Tuple t = (Tuple)it.next();
+            if (t != null && t.size() > 0 && t.get(0) != null)
+                cnt = cnt.add(BigDecimal.ONE);
+        }
+        return cnt;
+    }
+
+    static protected BigDecimal sum(Tuple input) throws ExecException, IOException {
+        DataBag values = (DataBag)input.get(0);
+
+        // if we were handed an empty bag, return NULL
+        if(values.size() == 0) {
+            return null;
+        }
+
+        BigDecimal sum = BigDecimal.ZERO;
+        boolean sawNonNull = false;
+        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+            Tuple t = it.next();
+            try{
+                BigDecimal d = (BigDecimal)(t.get(0));
+                if (d == null) continue;
+                sawNonNull = true;
+                sum = sum.add(d);
+            }catch(RuntimeException exp) {
+                int errCode = 2103;
+                String msg = "Problem while computing sum of BigDecimals.";
+                throw new ExecException(msg, errCode, PigException.BUG, exp);
+            }
+        }
+
+        if(sawNonNull) {
+            return sum;
+        } else {
+            return null;
+        }
+    }
+
+    static protected BigDecimal div(BigDecimal dividend, BigDecimal divisor){
+        // Averages will have IEEE 754R Decimal64 format, 16 digits, and a
+        // rounding mode of HALF_EVEN, the IEEE 754R default
+        return dividend.divide(divisor, MathContext.DECIMAL128);
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new Schema(new Schema.FieldSchema(null, DataType.BIGDECIMAL));
+    }
+
+    /* Accumulator interface */
+
+    private BigDecimal intermediateSum = null;
+    private BigDecimal intermediateCount = null;
+
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            BigDecimal sum = sum(b);
+            if(sum == null) {
+                return;
+            }
+            // set default values
+            if (intermediateSum == null || intermediateCount == null) {
+                intermediateSum = BigDecimal.ZERO;
+                intermediateCount = BigDecimal.ZERO;
+            }
+
+            BigDecimal count = (BigDecimal)count(b);
+
+            if (count.compareTo(BigDecimal.ZERO) > 0) {
+                intermediateCount = intermediateCount.add(count);
+                intermediateSum = intermediateSum.add(sum);
+            }
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing average in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+        intermediateCount = null;
+    }
+
+    @Override
+    public BigDecimal getValue() {
+        BigDecimal avg = null;
+        if (intermediateCount != null && (intermediateCount.compareTo(BigDecimal.ZERO) > 0)) {
+            avg = div(intermediateSum,intermediateCount);
+        }
+        return avg;
+    }
+}

Added: pig/trunk/src/org/apache/pig/builtin/BigDecimalMax.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/BigDecimalMax.java?rev=1543094&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/BigDecimalMax.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/BigDecimalMax.java Mon Nov 18 17:46:13 2013
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+/**
+ * This method should never be used directly, use {@link MAX}.
+ */
+public class BigDecimalMax extends AlgebraicBigDecimalMathBase {
+
+    public BigDecimalMax() {
+        setOp(KNOWN_OP.MAX);
+    }
+
+    public static class Intermediate extends AlgebraicBigDecimalMathBase.Intermediate {
+        @Override
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MAX;
+        }
+    }
+
+    public static class Final extends AlgebraicBigDecimalMathBase.Final {
+        @Override
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MAX;
+        }
+    }
+}

Added: pig/trunk/src/org/apache/pig/builtin/BigDecimalMin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/BigDecimalMin.java?rev=1543094&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/BigDecimalMin.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/BigDecimalMin.java Mon Nov 18 17:46:13 2013
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+/**
+ * This method should never be used directly, use {@link MIN}.
+ */
+public class BigDecimalMin extends AlgebraicBigDecimalMathBase {
+
+    public BigDecimalMin() {
+        setOp(KNOWN_OP.MIN);
+    }
+
+    public static class Intermediate extends AlgebraicBigDecimalMathBase.Intermediate {
+        @Override
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MIN;
+        }
+    }
+
+    public static class Final extends AlgebraicBigDecimalMathBase.Final {
+        @Override
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MIN;
+        }
+    }
+}

Added: pig/trunk/src/org/apache/pig/builtin/BigDecimalWrapper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/BigDecimalWrapper.java?rev=1543094&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/BigDecimalWrapper.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/BigDecimalWrapper.java Mon Nov 18 17:46:13 2013
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.math.MathContext;
+
+/**
+ * Max and min seeds cannot be defined to BigDecimal as the value could go as large as
+ * The computer allows. This wrapper is used to provide seed for MIN and MAX functions
+ * in AlgrebraicBigDecimalMathBase.java
+ */
+
+public class BigDecimalWrapper extends BigDecimal {
+    private static final long serialVersionUID = 1L;
+
+    private enum BigDecimalInfinity {
+        NEGATIVE_INFINITY,
+        POSITIVE_INFINITY
+    }
+
+    private BigDecimalInfinity infinity;
+
+    // BigDecimal constructors initialized to keep compiler happy
+    public BigDecimalWrapper(BigInteger val) {
+        super(val);
+    }
+
+    public BigDecimalWrapper(BigInteger unscaledVal, int scale) {
+        super(unscaledVal, scale);
+    }
+
+    public BigDecimalWrapper(BigInteger unscaledVal, int scale, MathContext mc) {
+        super(unscaledVal, scale, mc);
+    }
+
+    public BigDecimalWrapper(BigInteger val, MathContext mc) {
+        super(val, mc);
+    }
+
+    public BigDecimalWrapper(char[] in) {
+        super(in);
+    }
+
+    public BigDecimalWrapper(char[] in, int offset, int len) {
+        super(in, offset, len);
+
+    }
+
+    public BigDecimalWrapper(char[] in, int offset, int len, MathContext mc) {
+        super(in, offset, len, mc);
+    }
+
+    public BigDecimalWrapper(char[] in, MathContext mc) {
+        super(in, mc);
+    }
+
+    public BigDecimalWrapper(double val) {
+        super(val);
+    }
+
+    public BigDecimalWrapper(double val, MathContext mc) {
+        super(val, mc);
+    }
+
+    public BigDecimalWrapper(int val) {
+        super(val);
+    }
+
+    public BigDecimalWrapper(int val, MathContext mc) {
+        super(val,mc);
+    }
+
+    public BigDecimalWrapper(long val) {
+        super(val);
+    }
+
+    public BigDecimalWrapper(long val, MathContext mc) {
+        super(val, mc);
+    }
+
+    public BigDecimalWrapper(String val) {
+        super(val);
+    }
+
+    public BigDecimalWrapper(String val, MathContext mc) {
+        super(val, mc);
+    }
+
+    private BigDecimalWrapper(BigDecimalInfinity in) {
+        super(0);
+        infinity = in;
+    }
+
+    public boolean isPositiveInfinity() {
+        return (infinity==BigDecimalInfinity.POSITIVE_INFINITY);
+    }
+
+    public boolean isNegativeInfinity() {
+        return (infinity==BigDecimalInfinity.NEGATIVE_INFINITY);
+    }
+
+    static public BigDecimalWrapper NEGATIVE_INFINITY() {
+        return new BigDecimalWrapper(BigDecimalInfinity.NEGATIVE_INFINITY);
+    }
+
+    static public BigDecimalWrapper POSITIVE_INFINITY() {
+        return new BigDecimalWrapper(BigDecimalInfinity.POSITIVE_INFINITY);
+    }
+}

Added: pig/trunk/src/org/apache/pig/builtin/BigIntegerAvg.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/BigIntegerAvg.java?rev=1543094&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/BigIntegerAvg.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/BigIntegerAvg.java Mon Nov 18 17:46:13 2013
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.math.MathContext;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.backend.executionengine.ExecException;
+
+
+/**
+ * This method should never be used directly, use {@link AVG}.
+ */
+public class BigIntegerAvg extends EvalFunc<BigDecimal> implements Algebraic, Accumulator<BigDecimal> {
+
+    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+    @Override
+    public BigDecimal exec(Tuple input) throws IOException {
+        try {
+            BigInteger sum = sum(input);
+            if (sum == null) {
+                // either we were handed an empty bag or a bag
+                // filled with nulls - return null in this case
+                return null;
+            }
+            BigInteger count = count(input);
+
+            BigDecimal avg = null;
+            if (count.compareTo(BigInteger.ZERO) > 0)
+                avg = div(sum, count);
+            return avg;
+        } catch (ExecException ee) {
+            throw ee;
+        }
+    }
+
+    public String getInitial() {
+        return Initial.class.getName();
+    }
+
+    public String getIntermed() {
+        return Intermediate.class.getName();
+    }
+
+    public String getFinal() {
+        return Final.class.getName();
+    }
+
+    static public class Initial extends EvalFunc<Tuple> {
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
+                Tuple t = mTupleFactory.newTuple(2);
+                // input is a bag with one tuple containing
+                // the column we are trying to avg on
+                DataBag bg = (DataBag) input.get(0);
+                BigInteger d = null;
+                if (bg.iterator().hasNext()) {
+                    Tuple tp = bg.iterator().next();
+                    d = (BigInteger)(tp.get(0));
+                }
+                t.set(0, d);
+                if (d != null) {
+                    t.set(1, BigInteger.ONE);
+                } else {
+                    t.set(1, BigInteger.ZERO);
+                }
+                return t;
+            } catch (ExecException ee) {
+                throw ee;
+            } catch (Exception e) {
+                int errCode = 2106;
+                String msg = "Error while computing average in " + this.getClass().getSimpleName();
+                throw new ExecException(msg, errCode, PigException.BUG, e);
+            }
+
+        }
+    }
+
+    static public class Intermediate extends EvalFunc<Tuple> {
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
+                DataBag b = (DataBag)input.get(0);
+                return combine(b);
+            } catch (ExecException ee) {
+                throw ee;
+            } catch (Exception e) {
+                int errCode = 2106;
+                String msg = "Error while computing average in " + this.getClass().getSimpleName();
+                throw new ExecException(msg, errCode, PigException.BUG, e);
+            }
+        }
+    }
+
+    static public class Final extends EvalFunc<BigDecimal> {
+        @Override
+        public BigDecimal exec(Tuple input) throws IOException {
+            try {
+                DataBag b = (DataBag)input.get(0);
+                Tuple combined = combine(b);
+
+                BigInteger sum = (BigInteger)combined.get(0);
+                if (sum == null) {
+                    return null;
+                }
+                BigInteger count = (BigInteger)combined.get(1);
+
+                BigDecimal avg = null;
+                if (count.compareTo(BigInteger.ZERO) > 0) {
+                    avg = div(sum,count);
+                }
+                return avg;
+            } catch (ExecException ee) {
+                throw ee;
+            } catch (Exception e) {
+                int errCode = 2106;
+                String msg = "Error while computing average in " + this.getClass().getSimpleName();
+                throw new ExecException(msg, errCode, PigException.BUG, e);
+            }
+        }
+    }
+
+    static protected Tuple combine(DataBag values) throws ExecException {
+        BigInteger sum = BigInteger.ZERO;
+        BigInteger count = BigInteger.ZERO;
+
+        // combine is called from Intermediate and Final
+        // In either case, Initial would have been called
+        // before and would have sent in valid tuples
+        // Hence we don't need to check if incoming bag
+        // is empty
+
+        Tuple output = mTupleFactory.newTuple(2);
+        boolean sawNonNull = false;
+        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+            Tuple t = it.next();
+            BigInteger d = (BigInteger)t.get(0);
+            // we count nulls in avg as contributing 0
+            // a departure from SQL for performance of
+            // COUNT() which implemented by just inspecting
+            // size of the bag
+            if (d == null) {
+                d = BigInteger.ZERO;
+            } else {
+                sawNonNull = true;
+            }
+            sum = sum.add(d);
+            count = count.add((BigInteger)t.get(1));
+        }
+        if (sawNonNull) {
+            output.set(0, sum);
+        } else {
+            output.set(0, null);
+        }
+        output.set(1, count);
+        return output;
+    }
+
+    static protected BigInteger count(Tuple input) throws ExecException {
+        DataBag values = (DataBag)input.get(0);
+        Iterator<Tuple> it = values.iterator();
+        BigInteger cnt = BigInteger.ZERO;
+        while (it.hasNext()) {
+            Tuple t = (Tuple)it.next();
+            if (t != null && t.size() > 0 && t.get(0) != null)
+                cnt = cnt.add(BigInteger.ONE);
+        }
+        return cnt;
+    }
+
+    static protected BigInteger sum(Tuple input) throws ExecException, IOException {
+        DataBag values = (DataBag)input.get(0);
+
+        // if we were handed an empty bag, return NULL
+        if (values.size() == 0) {
+            return null;
+        }
+
+        BigInteger sum = BigInteger.ZERO;
+        boolean sawNonNull = false;
+        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+            Tuple t = it.next();
+            try {
+                BigInteger d = (BigInteger)(t.get(0));
+                if (d == null) continue;
+                sawNonNull = true;
+                sum = sum.add(d);
+            } catch (RuntimeException exp) {
+                int errCode = 2103;
+                String msg = "Problem while computing sum of doubles.";
+                throw new ExecException(msg, errCode, PigException.BUG, exp);
+            }
+        }
+
+        if (sawNonNull) {
+            return sum;
+        } else {
+            return null;
+        }
+    }
+
+    static protected BigDecimal div(BigInteger dividend, BigInteger divisor) {
+        // Averages will have IEEE 754R Decimal64 format, 16 digits, and a
+        // rounding mode of HALF_EVEN, the IEEE 754R default
+        return (new BigDecimal(dividend)).divide((new BigDecimal(divisor)), MathContext.DECIMAL128);
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new Schema(new Schema.FieldSchema(null, DataType.BIGDECIMAL));
+    }
+
+    /* Accumulator interface */
+
+    private BigInteger intermediateSum = null;
+    private BigInteger intermediateCount = null;
+
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            BigInteger sum = sum(b);
+            if (sum == null) {
+                return;
+            }
+            // set default values
+            if (intermediateSum == null || intermediateCount == null) {
+                intermediateSum = BigInteger.ZERO;
+                intermediateCount = BigInteger.ZERO;
+            }
+
+            BigInteger count = (BigInteger)count(b);
+
+            if (count.compareTo(BigInteger.ZERO) > 0) {
+                intermediateCount = intermediateCount.add(count);
+                intermediateSum = intermediateSum.add(sum);
+            }
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing average in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+        intermediateCount = null;
+    }
+
+    @Override
+    public BigDecimal getValue() {
+        BigDecimal avg = null;
+        if (intermediateCount != null && (intermediateCount.compareTo(BigInteger.ZERO) > 0)) {
+            avg = div(intermediateSum,intermediateCount);
+        }
+        return avg;
+    }
+}

Added: pig/trunk/src/org/apache/pig/builtin/BigIntegerMax.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/BigIntegerMax.java?rev=1543094&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/BigIntegerMax.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/BigIntegerMax.java Mon Nov 18 17:46:13 2013
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+/**
+ * This method should never be used directly, use {@link MAX}.
+ */
+public class BigIntegerMax extends AlgebraicBigIntegerMathBase {
+
+    public BigIntegerMax() {
+        setOp(KNOWN_OP.MAX);
+    }
+
+    public static class Intermediate extends AlgebraicBigIntegerMathBase.Intermediate {
+        @Override
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MAX;
+        }
+    }
+
+    public static class Final extends AlgebraicBigIntegerMathBase.Final {
+        @Override
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MAX;
+        }
+    }
+}

Added: pig/trunk/src/org/apache/pig/builtin/BigIntegerMin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/BigIntegerMin.java?rev=1543094&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/BigIntegerMin.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/BigIntegerMin.java Mon Nov 18 17:46:13 2013
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+/**
+ * This method should never be used directly, use {@link MIN}.
+ */
+public class BigIntegerMin extends AlgebraicBigIntegerMathBase {
+
+    public BigIntegerMin() {
+        setOp(KNOWN_OP.MIN);
+    }
+
+    public static class Intermediate extends AlgebraicBigIntegerMathBase.Intermediate {
+        @Override
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MIN;
+        }
+    }
+
+    public static class Final extends AlgebraicBigIntegerMathBase.Final {
+        @Override
+        public KNOWN_OP getOp() {
+            return KNOWN_OP.MIN;
+        }
+    }
+}

Added: pig/trunk/src/org/apache/pig/builtin/BigIntegerWrapper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/BigIntegerWrapper.java?rev=1543094&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/BigIntegerWrapper.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/BigIntegerWrapper.java Mon Nov 18 17:46:13 2013
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.math.BigInteger;
+import java.util.Random;
+
+/**
+ * Max and min seeds cannot be defined to BigInteger as the value could go as large as
+ * The computer allows. This wrapper is used to provide seed for MIN and MAX functions
+ * in AlgrebraicBigIntegerMathBase.java
+ */
+
+public class BigIntegerWrapper extends BigInteger {
+    private static final long serialVersionUID = 1L;
+
+    private enum BigIntegerInfinity{
+        NEGATIVE_INFINITY,
+        POSITIVE_INFINITY
+    }
+
+    private BigIntegerInfinity infinity;
+
+    // BigInteger constructors initialized to keep compiler happy
+    public BigIntegerWrapper(byte[] val) {
+        super(val);
+    }
+
+    public BigIntegerWrapper(int signum, byte[] magnitude) {
+        super(signum, magnitude);
+    }
+
+    public BigIntegerWrapper(int bitLength, int certainty, Random rnd) {
+        super(bitLength, certainty, rnd);
+    }
+
+    public BigIntegerWrapper(int numBits, Random rnd) {
+        super(numBits, rnd);
+    }
+
+    public BigIntegerWrapper(String val) {
+        super(val);
+    }
+
+    public BigIntegerWrapper(String val, int radix) {
+        super(val, radix);
+    }
+
+    private BigIntegerWrapper(BigIntegerInfinity in) {
+        super("0");
+        infinity = in;
+    }
+
+    public boolean isPositiveInfinity() {
+        return (infinity==BigIntegerInfinity.POSITIVE_INFINITY);
+    }
+
+    public boolean isNegativeInfinity() {
+        return (infinity==BigIntegerInfinity.NEGATIVE_INFINITY);
+    }
+
+    static public BigIntegerWrapper NEGATIVE_INFINITY() {
+        return new BigIntegerWrapper(BigIntegerInfinity.NEGATIVE_INFINITY);
+    }
+
+    static public BigIntegerWrapper POSITIVE_INFINITY() {
+        return new BigIntegerWrapper(BigIntegerInfinity.POSITIVE_INFINITY);
+    }
+}

Modified: pig/trunk/src/org/apache/pig/builtin/MAX.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/MAX.java?rev=1543094&r1=1543093&r2=1543094&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/MAX.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/MAX.java Mon Nov 18 17:46:13 2013
@@ -53,8 +53,8 @@ public class MAX extends AlgebraicByteAr
         @Override
         public KNOWN_OP getOp() {
             return KNOWN_OP.MAX;
-            }
         }
+    }
 
     public static class Final extends AlgebraicByteArrayMathBase.Final {
         @Override
@@ -62,7 +62,7 @@ public class MAX extends AlgebraicByteAr
             return KNOWN_OP.MAX;
         }
     }
-    
+
     /* (non-Javadoc)
      * @see org.apache.pig.EvalFunc#getArgToFuncMapping()
      */
@@ -76,6 +76,8 @@ public class MAX extends AlgebraicByteAr
         funcList.add(new FuncSpec(LongMax.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
         funcList.add(new FuncSpec(StringMax.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY)));
         funcList.add(new FuncSpec(DateTimeMax.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.DATETIME)));
+        funcList.add(new FuncSpec(BigDecimalMax.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.BIGDECIMAL)));
+        funcList.add(new FuncSpec(BigIntegerMax.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.BIGINTEGER)));
         return funcList;
     }
 }

Modified: pig/trunk/src/org/apache/pig/builtin/MIN.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/MIN.java?rev=1543094&r1=1543093&r2=1543094&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/MIN.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/MIN.java Mon Nov 18 17:46:13 2013
@@ -53,8 +53,8 @@ public class MIN extends  AlgebraicByteA
         @Override
         public KNOWN_OP getOp() {
             return KNOWN_OP.MIN;
-            }
         }
+    }
 
     public static class Final extends AlgebraicByteArrayMathBase.Final {
         @Override
@@ -62,7 +62,7 @@ public class MIN extends  AlgebraicByteA
             return KNOWN_OP.MIN;
         }
     }
-    
+
     /* (non-Javadoc)
      * @see org.apache.pig.EvalFunc#getArgToFuncMapping()
      */
@@ -76,6 +76,8 @@ public class MIN extends  AlgebraicByteA
         funcList.add(new FuncSpec(LongMin.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
         funcList.add(new FuncSpec(StringMin.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY)));
         funcList.add(new FuncSpec(DateTimeMin.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.DATETIME)));
+        funcList.add(new FuncSpec(BigDecimalMin.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.BIGDECIMAL)));
+        funcList.add(new FuncSpec(BigIntegerMin.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.BIGINTEGER)));
         return funcList;
     }
 }