You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/16 01:26:56 UTC

svn commit: r1542433 - in /pig/branches/tez: ./ src/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ src/org/apache/pig/backend/hadoop/executionengine...

Author: cheolsoo
Date: Sat Nov 16 00:26:55 2013
New Revision: 1542433

URL: http://svn.apache.org/r1542433
Log:
Merge latest trunk changes into tez branch

Added:
    pig/branches/tez/src/org/apache/pig/builtin/AlgebraicBigDecimalMathBase.java
      - copied unchanged from r1542432, pig/trunk/src/org/apache/pig/builtin/AlgebraicBigDecimalMathBase.java
    pig/branches/tez/src/org/apache/pig/builtin/AlgebraicBigIntegerMathBase.java
      - copied unchanged from r1542432, pig/trunk/src/org/apache/pig/builtin/AlgebraicBigIntegerMathBase.java
    pig/branches/tez/src/org/apache/pig/builtin/BigDecimalSum.java
      - copied unchanged from r1542432, pig/trunk/src/org/apache/pig/builtin/BigDecimalSum.java
    pig/branches/tez/src/org/apache/pig/builtin/BigIntegerSum.java
      - copied unchanged from r1542432, pig/trunk/src/org/apache/pig/builtin/BigIntegerSum.java
Modified:
    pig/branches/tez/   (props changed)
    pig/branches/tez/CHANGES.txt
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
    pig/branches/tez/src/org/apache/pig/builtin/SUM.java
    pig/branches/tez/src/pig-default.properties   (props changed)
    pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java
    pig/branches/tez/test/org/apache/pig/test/TestEqualTo.java
    pig/branches/tez/test/org/apache/pig/test/TestGTOrEqual.java
    pig/branches/tez/test/org/apache/pig/test/TestGreaterThan.java
    pig/branches/tez/test/org/apache/pig/test/TestLTOrEqual.java
    pig/branches/tez/test/org/apache/pig/test/TestLessThan.java
    pig/branches/tez/test/org/apache/pig/test/TestNotEqualTo.java
    pig/branches/tez/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2   (props changed)

Propchange: pig/branches/tez/
------------------------------------------------------------------------------
  Merged /pig/trunk:r1541676-1542432

Modified: pig/branches/tez/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/tez/CHANGES.txt?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/CHANGES.txt (original)
+++ pig/branches/tez/CHANGES.txt Sat Nov 16 00:26:55 2013
@@ -28,6 +28,8 @@ PIG-3419: Pluggable Execution Engine (ac
 
 IMPROVEMENTS
 
+PIG-3569: SUM function for BigDecimal and BigInteger (harichinnan via rohini)
+
 PIG-3505: Make AvroStorage sync interval take default from io.file.buffer.size (rohini)
 
 PIG-3563: support adding archives to the distributed cache (jdonofrio via cheolsoo)
@@ -50,6 +52,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-3568: Define the semantics of POStatus.STATUS_NULL (mwagner via cheolsoo)
+
 PIG-3561: Clean up PigStats and JobStats after PIG-3419 (cheolsoo)
 
 PIG-3553: HadoopJobHistoryLoader fails to load job history on hadoop v 1.2 (lgiri via cheolsoo)
@@ -88,6 +92,7 @@ PIG-3469: Skewed join can cause unrecove
 
 PIG-3496: Propagate HBase 0.95 jars to the backend (Jarek Jarcec Cecho  via xuefuz)
 
+
 Release 0.12.1 (unreleased changes)
 
 IMPROVEMENTS
@@ -114,6 +119,7 @@ PIG-3512: Reducer estimater is broken by
 
 PIG-3510: New filter extractor fails with more than one filter statement (aniket486 via cheolsoo)
 
+
 Release 0.12.0
 
 INCOMPATIBLE CHANGES

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java Sat Nov 16 00:26:55 2013
@@ -17,23 +17,57 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer;
 
+/**
+ * POStatus is a set of flags used to communicate the status of Pig's operator
+ * pipeline to consumers.
+ */
 public class POStatus {
+
+    /**
+     * STATUS_OK indicates that the pull on the operator pipeline resulted in a
+     * valid output.
+     */
     public static final byte STATUS_OK = 0;
 
+    /**
+     * STATUS_NULL indicates that no output was produced, but there may be more
+     * results. This can happen if a value is filtered out or an empty bag is
+     * flattened. A caller will typically ignore the output and try again after
+     * seeing STATUS_NULL.
+     *
+     * This does NOT indicate that the output is the value 'null' (which is
+     * possible in expressions). This is represented as 'null' with STATUS_OK.
+     */
     public static final byte STATUS_NULL = 1;
 
+    /**
+     * STATUS_ERR indicates that there was a problem while trying to produce a
+     * result. This should be remembered and fed back to the user.
+     */
     public static final byte STATUS_ERR = 2;
 
-    public static final byte STATUS_EOP = 3; // end of processing
-
-    // This is currently only used in communications
-    // between ExecutableManager and POStream
-    public static final byte STATUS_EOS = 4; // end of Streaming output (i.e. output from streaming binary)
-
-    // successfully processing of a batch, used by accumulative UDFs
-    // this is used for accumulative UDFs
+    /**
+     * STATUS_EOP indicates that no output was produced, and no further outputs
+     * will be produced (e.g. all attached inputs have been consumed or a limit
+     * has reached its threshold). A caller will typically terminate or attach
+     * new inputs on seeing this status.
+     */
+    public static final byte STATUS_EOP = 3;
+
+    /**
+     * This is currently only used in communications between ExecutableManager
+     * and POStream. It indicates the end of Streaming output (i.e. output from
+     * streaming binary).
+     */
+    public static final byte STATUS_EOS = 4;
+
+    /**
+     * Successful processing of a batch. This is used for accumulative UDFs.
+     */
     public static final byte STATUS_BATCH_OK = 5;
 
-    // this signals that an accumulative UDF has already finished
+    /**
+     * This signals that an accumulative UDF has already finished.
+     */
     public static final byte STATUS_EARLY_TERMINATION = 6;
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Sat Nov 16 00:26:55 2013
@@ -265,28 +265,29 @@ public abstract class PhysicalOperator e
      */
     public Result processInput() throws ExecException {
         try {
-        Result res = new Result();
-        if (input == null && (inputs == null || inputs.size()==0)) {
-//            log.warn("No inputs found. Signaling End of Processing.");
-            res.returnStatus = POStatus.STATUS_EOP;
-            return res;
-        }
+            Result res = new Result();
+            if (input == null && (inputs == null || inputs.size() == 0)) {
+                // log.warn("No inputs found. Signaling End of Processing.");
+                res.returnStatus = POStatus.STATUS_EOP;
+                return res;
+            }
 
-        //Should be removed once the model is clear
-        if(getReporter()!=null) {
-            getReporter().progress();
-        }
+            // Should be removed once the model is clear
+            if (getReporter() != null) {
+                getReporter().progress();
+            }
 
-        if (!isInputAttached()) {
+            if (!isInputAttached()) {
                 return inputs.get(0).getNextTuple();
-        } else {
-            res.result = input;
-            res.returnStatus = (res.result == null ? POStatus.STATUS_NULL: POStatus.STATUS_OK);
-            detachInput();
-            return res;
-        }
+            } else {
+                res.result = input;
+                res.returnStatus = POStatus.STATUS_OK;
+                detachInput();
+                return res;
+            }
         } catch (ExecException e) {
-            throw new ExecException("Exception while executing " + this.toString() + ": " + e.toString(), e);
+            throw new ExecException("Exception while executing "
+                    + this.toString() + ": " + e.toString(), e);
         }
     }
 
@@ -382,11 +383,14 @@ public abstract class PhysicalOperator e
     public Result getNextDataBag() throws ExecException {
         Result ret = null;
         DataBag tmpBag = BagFactory.getInstance().newDefaultBag();
-        for(ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()){
-            if(ret.returnStatus == POStatus.STATUS_ERR) {
+        for (ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()) {
+            if (ret.returnStatus == POStatus.STATUS_ERR) {
                 return ret;
+            } else if (ret.returnStatus == POStatus.STATUS_NULL) {
+                continue;
+            } else {
+                tmpBag.add((Tuple) ret.result);
             }
-            tmpBag.add((Tuple)ret.result);
         }
         ret.result = tmpBag;
         ret.returnStatus = (tmpBag.size() == 0)? POStatus.STATUS_EOP : POStatus.STATUS_OK;
@@ -449,15 +453,15 @@ public abstract class PhysicalOperator e
     }
 
     public Log getLogger() {
-    	return log;
+        return log;
     }
 
     public static void setPigLogger(PigLogger logger) {
-    	pigLogger = logger;
+        pigLogger = logger;
     }
 
     public static PigLogger getPigLogger() {
-    	return pigLogger;
+        return pigLogger;
     }
 
     public static class OriginalLocation implements Serializable {
@@ -470,7 +474,7 @@ public abstract class PhysicalOperator e
             this.alias = alias;
             this.line = line;
             this.offset = offset;
-}
+        }
 
         public String getAlias() {
             return alias;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java Sat Nov 16 00:26:55 2013
@@ -110,7 +110,7 @@ public class EqualToExpr extends BinaryC
         // null
         if(left.result == null || right.result == null) {
             left.result = null;
-            left.returnStatus = POStatus.STATUS_NULL;
+            left.returnStatus = POStatus.STATUS_OK;
             return left;
         }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java Sat Nov 16 00:26:55 2013
@@ -101,7 +101,7 @@ public class GTOrEqualToExpr extends Bin
         // null
         if(left.result == null || right.result == null) {
             left.result = null;
-            left.returnStatus = POStatus.STATUS_NULL;
+            left.returnStatus = POStatus.STATUS_OK;
             return left;
         }
         assert(left.result instanceof Comparable);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java Sat Nov 16 00:26:55 2013
@@ -97,7 +97,7 @@ public class GreaterThanExpr extends Bin
         // null
         if(left.result == null || right.result == null) {
             left.result = null;
-            left.returnStatus = POStatus.STATUS_NULL;
+            left.returnStatus = POStatus.STATUS_OK;
             return left;
         }
         assert(left.result instanceof Comparable);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java Sat Nov 16 00:26:55 2013
@@ -100,7 +100,7 @@ public class LTOrEqualToExpr extends Bin
         // null
         if(left.result == null || right.result == null) {
             left.result = null;
-            left.returnStatus = POStatus.STATUS_NULL;
+            left.returnStatus = POStatus.STATUS_OK;
             return left;
         }
         assert(left.result instanceof Comparable);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java Sat Nov 16 00:26:55 2013
@@ -101,7 +101,7 @@ public class LessThanExpr extends Binary
         // null
         if(left.result == null || right.result == null) {
             left.result = null;
-            left.returnStatus = POStatus.STATUS_NULL;
+            left.returnStatus = POStatus.STATUS_OK;
             return left;
         }
         assert(left.result instanceof Comparable);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java Sat Nov 16 00:26:55 2013
@@ -105,7 +105,7 @@ public class NotEqualToExpr extends Bina
         // null
         if(left.result == null || right.result == null) {
             left.result = null;
-            left.returnStatus = POStatus.STATUS_NULL;
+            left.returnStatus = POStatus.STATUS_OK;
             return left;
         }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java Sat Nov 16 00:26:55 2013
@@ -22,8 +22,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.data.DataType;
-import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -65,8 +65,8 @@ public class POAnd extends BinaryCompari
         
         Result left;
         left = lhs.getNextBoolean();
-        // pass on ERROR and EOP 
-        if(left.returnStatus != POStatus.STATUS_OK && left.returnStatus != POStatus.STATUS_NULL) {
+        // pass on ERROR and EOP and NULL
+        if (left.returnStatus != POStatus.STATUS_OK) {
             return left;
         }
         
@@ -93,7 +93,7 @@ public class POAnd extends BinaryCompari
         }
         
         // pass on ERROR and EOP 
-        if(right.returnStatus != POStatus.STATUS_OK && right.returnStatus != POStatus.STATUS_NULL) {
+        if (right.returnStatus != POStatus.STATUS_OK) {
             return right;
         }
         

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java Sat Nov 16 00:26:55 2013
@@ -22,8 +22,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.data.DataType;
-import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -65,8 +65,8 @@ public class POOr extends BinaryComparis
         
         Result left;
         left = lhs.getNextBoolean();
-        // pass on ERROR and EOP
-        if(left.returnStatus != POStatus.STATUS_OK && left.returnStatus != POStatus.STATUS_NULL) {
+        // pass on ERROR, EOP, and NULL
+        if (left.returnStatus != POStatus.STATUS_OK) {
             return left;
         }
         
@@ -91,8 +91,8 @@ public class POOr extends BinaryComparis
         if (returnLeft)
             return left;
 
-        // pass on ERROR and EOP 
-        if(right.returnStatus != POStatus.STATUS_OK && right.returnStatus != POStatus.STATUS_NULL) {
+        // pass on ERROR, EOP, and NULL
+        if (right.returnStatus != POStatus.STATUS_OK) {
             return right;
         }
         

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Sat Nov 16 00:26:55 2013
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -310,7 +309,7 @@ public class POProject extends Expressio
         } else if (input.result==null) {
             Result retVal = new Result();
             retVal.result = null;
-            retVal.returnStatus = POStatus.STATUS_NULL;
+            retVal.returnStatus = POStatus.STATUS_OK;
             return retVal;
         } else {
             throw new ExecException("Cannot dereference a bag from " + input.result.getClass().getName(), 1129);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java Sat Nov 16 00:26:55 2013
@@ -25,7 +25,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.NonSpillableDataBag;
-import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -112,7 +111,9 @@ public class PORelationToExprProject ext
         }
         
         if(input.returnStatus!=POStatus.STATUS_OK) {
-            if(input.returnStatus == POStatus.STATUS_EOP && sendEmptyBagOnEOP)  {
+            if(input.returnStatus == POStatus.STATUS_NULL){
+                return input;
+            } else if (input.returnStatus == POStatus.STATUS_EOP && sendEmptyBagOnEOP)  {
                 // we received an EOP from the predecessor
                 // since the successor in the pipeline is
                 // expecting a bag, send an empty bag

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Sat Nov 16 00:26:55 2013
@@ -27,12 +27,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.pig.data.Tuple;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryComparisonOperator;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
@@ -52,9 +52,9 @@ public class PhysicalPlan extends Operat
     private static final long serialVersionUID = 1L;
     
     // marker to indicate whether all input for this plan
-    // has been sent - this is currently only used in POStream
+    // has been sent - this is currently used in POStream
     // to know if all map() calls and reduce() calls are finished
-    // and that there is no more input expected.
+    // and that there is no more input expected and in POPartialAgg.
     public boolean endOfAllInput = false;
 
     private MultiMap<PhysicalOperator, PhysicalOperator> opmap = null;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Sat Nov 16 00:26:55 2013
@@ -19,32 +19,24 @@
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.HashSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.DistinctDataBag;
 import org.apache.pig.data.InternalDistinctBag;
-import org.apache.pig.data.InternalSortedBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
-import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
-import org.apache.pig.impl.util.IdentityHashSet;
 
 /**
  * Find the distinct set of tuples in a bag.
@@ -95,37 +87,34 @@ public class PODistinct extends Physical
     @Override
     public Result getNextTuple() throws ExecException {
          if (!inputsAccumulated) {
-            Result in = processInput();    
-            
             // by default, we create InternalSortedBag, unless user configures
-			// explicitly to use old bag
-           	String bagType = null;
+            // explicitly to use old bag
+            String bagType = null;
             if (PigMapReduce.sJobConfInternal.get() != null) {
-       			bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.distinct.type");       			
-       	    }            
-            if (bagType != null && bagType.equalsIgnoreCase("default")) {        	    	
-            	distinctBag = BagFactory.getInstance().newDistinctBag();    			
-       	    } else {
-       	    	distinctBag = new InternalDistinctBag(3);
-    	    }
-            
+                bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.distinct.type");
+            }
+            if (bagType != null && bagType.equalsIgnoreCase("default")) {
+                distinctBag = BagFactory.getInstance().newDistinctBag();
+            } else {
+                distinctBag = new InternalDistinctBag(3);
+            }
+
+            Result in = processInput();
             while (in.returnStatus != POStatus.STATUS_EOP) {
                 if (in.returnStatus == POStatus.STATUS_ERR) {
                     log.error("Error in reading from inputs");
                     return in;
-                    //continue;
                 } else if (in.returnStatus == POStatus.STATUS_NULL) {
-                    // Ignore the null, read the next tuple.  It's not clear
-                    // to me that we should ever get this, or if we should, 
-                    // how it differs from EOP.  But ignoring it here seems
-                    // to work.
+                    // Ignore and read the next tuple.
                     in = processInput();
                     continue;
+                } else {
+                    distinctBag.add((Tuple) in.result);
+                    illustratorMarkup(in.result, in.result, 0);
+                    in = processInput();
                 }
-                distinctBag.add((Tuple) in.result);
-                illustratorMarkup(in.result, in.result, 0);
-                in = processInput();
             }
+
             inputsAccumulated = true;
         }
         if (it == null) {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java Sat Nov 16 00:26:55 2013
@@ -18,21 +18,17 @@
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.List;
-import java.util.LinkedList;
 
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
-import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
 
 /**
  * This is an implementation of the Filter operator. It has an Expression Plan
@@ -147,8 +143,7 @@ public class POFilter extends PhysicalOp
             */
             res = comOp.getNextBoolean();
             plan.detachInput();
-            if (res.returnStatus != POStatus.STATUS_OK 
-                    && res.returnStatus != POStatus.STATUS_NULL) 
+            if (res.returnStatus != POStatus.STATUS_OK)
                 return res;
 
             if (res.result != null) {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Sat Nov 16 00:26:55 2013
@@ -308,9 +308,8 @@ public class POLocalRearrange extends Ph
                     break;
                 }
 
-                // allow null as group by key
-                if (res.returnStatus != POStatus.STATUS_OK && res.returnStatus != POStatus.STATUS_NULL) {
-                    return new Result();
+                if (res.returnStatus != POStatus.STATUS_OK) {
+                    return res;
                 }
 
                 resLst.add(res);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Sat Nov 16 00:26:55 2013
@@ -60,7 +60,6 @@ public class POPartialAgg extends Physic
     private static final Log LOG = LogFactory.getLog(POPartialAgg.class);
     private static final long serialVersionUID = 1L;
 
-    private static final Result ERR_RESULT = new Result();
     private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP,
             null);
 
@@ -155,10 +154,11 @@ public class POPartialAgg extends Physic
             if (doSpill) {
                 startSpill();
                 Result result = spillResult();
-                if (result == EOP_RESULT) {
+                if (result.returnStatus == POStatus.STATUS_EOP) {
                     doSpill = false;
                 }
-                if (result != EOP_RESULT || inputsExhausted) {
+                if (result.returnStatus != POStatus.STATUS_EOP
+                        || inputsExhausted) {
                     return result;
                 }
             }
@@ -189,8 +189,8 @@ public class POPartialAgg extends Physic
 
                     // evaluate the key
                     Result keyRes = getResult(keyLeaf);
-                    if (keyRes == ERR_RESULT) {
-                        return ERR_RESULT;
+                    if (keyRes.returnStatus != POStatus.STATUS_OK) {
+                        return keyRes;
                     }
                     Object key = keyRes.result;
                     keyPlan.detachInput();
@@ -448,7 +448,7 @@ public class POPartialAgg extends Physic
 
 
     private Result getResult(ExpressionOperator op) throws ExecException {
-        Result res = ERR_RESULT;
+        Result res;
         switch (op.getResultType()) {
         case DataType.BAG:
         case DataType.BOOLEAN:
@@ -471,12 +471,7 @@ public class POPartialAgg extends Physic
             throw new ExecException(msg, 2270, PigException.BUG);
         }
 
-        // allow null as group by key
-        if (res.returnStatus == POStatus.STATUS_OK
-                || res.returnStatus == POStatus.STATUS_NULL) {
-            return res;
-        }
-        return ERR_RESULT;
+        return res;
     }
 
     /**
@@ -493,8 +488,8 @@ public class POPartialAgg extends Physic
         for (int i = 0; i < valuePlans.size(); i++) {
             valuePlans.get(i).attachInput(value);
             Result valRes = getResult(valueLeaves.get(i));
-            if (valRes == ERR_RESULT) {
-                return ERR_RESULT;
+            if (valRes.returnStatus == POStatus.STATUS_ERR) {
+                return valRes;
             }
             output.set(i + 1, valRes.result);
         }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java Sat Nov 16 00:26:55 2013
@@ -159,10 +159,8 @@ public class POPreCombinerLocalRearrange
                     break;
                 }
 
-                // allow null as group by key
-                if (res.returnStatus != POStatus.STATUS_OK
-                        && res.returnStatus != POStatus.STATUS_NULL) {
-                    return new Result();
+                if (res.returnStatus != POStatus.STATUS_OK) {
+                    return res;
                 }
 
                 resLst.add(res);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Sat Nov 16 00:26:55 2013
@@ -268,20 +268,18 @@ public class POSort extends PhysicalOper
     	    	sortedBag = new InternalSortedBag(3, mComparator);
     	    }
 
-			while (res.returnStatus != POStatus.STATUS_EOP) {
+            while (res.returnStatus != POStatus.STATUS_EOP) {
 				if (res.returnStatus == POStatus.STATUS_ERR) {
 					log.error("Error in reading from the inputs");
 					return res;
-					//continue;
-				} else if (res.returnStatus == POStatus.STATUS_NULL) {
-                    // ignore the null, read the next tuple.
+                } else if (res.returnStatus == POStatus.STATUS_NULL) {
+                    // Ignore and read the next tuple.
                     res = processInput();
-					continue;
-				}
+                    continue;
+                }
 				sortedBag.add((Tuple) res.result);
 				res = processInput();
-
-			}
+            }
 
 			inputsAccumulated = true;
 

Modified: pig/branches/tez/src/org/apache/pig/builtin/SUM.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/SUM.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/SUM.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/SUM.java Sat Nov 16 00:26:55 2013
@@ -77,6 +77,11 @@ public class SUM extends AlgebraicByteAr
         // LongSum works for both Ints and Longs.
         funcList.add(new FuncSpec(LongSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));
         funcList.add(new FuncSpec(LongSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
+        //Adding BigDecimal
+        funcList.add(new FuncSpec(BigDecimalSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.BIGDECIMAL)));
+        //dding BigInteger
+        funcList.add(new FuncSpec(BigIntegerSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.BIGINTEGER)));
+
         return funcList;
     }
 

Propchange: pig/branches/tez/src/pig-default.properties
------------------------------------------------------------------------------
  Merged /pig/trunk/src/pig-default.properties:r1541676-1542432

Modified: pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java Sat Nov 16 00:26:55 2013
@@ -37,6 +37,8 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.StringTokenizer;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
@@ -150,6 +152,9 @@ public class TestBuiltin {
 
     private static Double[] doubleInput = { 5.5673910, 121.0, 3.0, 0.000000834593, 1.0, 6.0, 7.0, 8.0, 9.0, 10.0, null };
 
+    private static BigDecimal[] bigDecimalInput = {BigDecimal.ZERO, BigDecimal.ONE, BigDecimal.TEN, new BigDecimal("99999999999999977.9999999999999999999999999999999999999999")};
+    private static BigInteger[] bigIntegerInput = {BigInteger.ZERO, BigInteger.ONE, BigInteger.TEN, new BigInteger("999999999998888888887777777777777744444488888889999999999977")};
+
     private static String[] ba = { "7", "2", "3", null, "4", "5", "6", "1", "8", "9", "10"};
     private static Double[] baAsDouble = { 7.0, 2.0, 3.0, null, 4.0, 5.0, 6.0, 1.0, 8.0, 9.0, 10.0};
 
@@ -183,7 +188,7 @@ public class TestBuiltin {
     String[] stages = {"Initial", "Intermediate", "Final"};
 
     String[][] aggs = {
-            {"SUM", "IntSum", "LongSum", "FloatSum", "DoubleSum"},
+            {"SUM", "IntSum", "LongSum", "FloatSum", "DoubleSum", "BigDecimalSum", "BigIntegerSum"},
             {"AVG", "IntAvg", "LongAvg", "FloatAvg", "DoubleAvg"},
             {"MIN", "IntMin", "LongMin", "FloatMin", "DoubleMin", "StringMin", "DateTimeMin"},
             {"MAX", "IntMax", "LongMax", "FloatMax", "DoubleMax", "StringMax", "DateTimeMax"},
@@ -192,6 +197,7 @@ public class TestBuiltin {
 
     String[] inputTypeAsString = {"ByteArray", "Integer", "Long", "Float", "Double", "String", "DateTime"};
 
+
     @Before
     public void setUp() throws Exception {
 
@@ -214,6 +220,8 @@ public class TestBuiltin {
         expectedMap.put("IntSum", new Long(55));
         expectedMap.put("LongSum", new Long(145776964666362L));
         expectedMap.put("FloatSum", new Double(56.15395));
+        expectedMap.put("BigDecimalSum", new BigDecimal("99999999999999988.9999999999999999999999999999999999999999"));
+        expectedMap.put("BigIntegerSum", new BigInteger("999999999998888888887777777777777744444488888889999999999988"));
 
         expectedMap.put("AVG", new Double(5.5));
         expectedMap.put("DoubleAvg", new Double(17.0567391834593));
@@ -243,7 +251,14 @@ public class TestBuiltin {
         for (String[] aggGroups : aggs) {
             int i = 0;
             for(String agg: aggGroups) {
-                allowedInput.put(agg, inputTypeAsString[i++]);
+                //This portion could be reverted once MIN, MAX and AVG are implemented for BigDecimal and BigInteger
+                if(agg == "BigDecimalSum")
+                        allowedInput.put(agg, "BigDecimal");
+                else if(agg == "BigIntegerSum")
+                        allowedInput.put(agg,"BigInteger");
+                else
+                    allowedInput.put(agg, inputTypeAsString[i++]);
+                
             }
         }
 
@@ -307,6 +322,7 @@ public class TestBuiltin {
         expectedMap.put("IntAvgIntermediate", expectedMap.get("IntSum"));
         expectedMap.put("FloatAvgIntermediate", expectedMap.get("FloatSum"));
 
+
         // set up input hash
             inputMap.put("Integer", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), intInput));
             inputMap.put("IntegerAsLong", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), intAsLong));
@@ -314,6 +330,8 @@ public class TestBuiltin {
             inputMap.put("Float", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), floatInput));
             inputMap.put("FloatAsDouble", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), floatAsDouble));
             inputMap.put("Double", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), doubleInput));
+            inputMap.put("BigDecimal", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), bigDecimalInput));
+            inputMap.put("BigInteger", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), bigIntegerInput));
             inputMap.put("ByteArray", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), ByteArrayInput));
             inputMap.put("ByteArrayAsDouble", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), baAsDouble));
             inputMap.put("String", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), stringInput));
@@ -1273,7 +1291,7 @@ public class TestBuiltin {
 
     @Test
     public void testSUM() throws Exception {
-        String[] sumTypes = {"SUM", "DoubleSum", "LongSum", "IntSum", "FloatSum"};
+        String[] sumTypes = {"SUM", "DoubleSum", "LongSum", "IntSum", "FloatSum", "BigDecimalSum", "BigIntegerSum"};
         for(int k = 0; k < sumTypes.length; k++) {
             EvalFunc<?> sum = evalFuncMap.get(sumTypes[k]);
             String inputType = getInputType(sumTypes[k]);
@@ -1285,7 +1303,13 @@ public class TestBuiltin {
 
             if(inputType == "Integer" || inputType == "Long") {
                 assertEquals(msg, (Long)output, (Long)getExpected(sumTypes[k]), 0.00001);
-            } else {
+            }
+            //Assert Equals does not support BigDecimal or BigInteger. Converting into String
+            else if(inputType == "BigDecimal")
+                assertEquals(msg, ((BigDecimal) output).toPlainString(), ((BigDecimal)getExpected(sumTypes[k])).toPlainString());
+            else if(inputType == "BigInteger")
+                assertEquals(msg, ((BigInteger) output).toString(), ((BigInteger)getExpected(sumTypes[k])).toString());
+            else {
                 assertEquals(msg, (Double)output, (Double)getExpected(sumTypes[k]), 0.00001);
             }
         }
@@ -1293,7 +1317,7 @@ public class TestBuiltin {
 
     @Test
     public void testSUMIntermed() throws Exception {
-        String[] sumTypes = {"SUMIntermediate", "DoubleSumIntermediate", "LongSumIntermediate", "IntSumIntermediate", "FloatSumIntermediate"};
+        String[] sumTypes = {"SUMIntermediate", "DoubleSumIntermediate", "LongSumIntermediate", "IntSumIntermediate", "FloatSumIntermediate", "BigDecimalSumIntermediate", "BigIntegerSumIntermediate"};
         for(int k = 0; k < sumTypes.length; k++) {
             EvalFunc<?> sum = evalFuncMap.get(sumTypes[k]);
             String inputType = getInputType(sumTypes[k]);
@@ -1305,7 +1329,13 @@ public class TestBuiltin {
                             ((Tuple)output).get(0) + " == " + getExpected(sumTypes[k]) + " (expected) )]";
             if(inputType.equals("Integer") || inputType.equals("Long") || inputType.equals("IntegerAsLong")) {
               assertEquals(msg, (Long) ((Tuple)output).get(0), (Long)getExpected(sumTypes[k]), 0.00001);
-            } else {
+            }
+            //Assert Equals does not support BigDecimal or BigInteger. Converting into String
+            else if(inputType == "BigDecimal")
+                assertEquals(msg, ((BigDecimal) ((Tuple)output).get(0)).toPlainString(), ((BigDecimal)getExpected(sumTypes[k])).toPlainString());
+            else if(inputType == "BigInteger")
+                assertEquals(msg, ((BigInteger) ((Tuple)output).get(0)).toString(), ((BigInteger)getExpected(sumTypes[k])).toString()); 
+            else {
               assertEquals(msg, (Double) ((Tuple)output).get(0), (Double)getExpected(sumTypes[k]), 0.00001);
             }
         }
@@ -1313,7 +1343,7 @@ public class TestBuiltin {
 
     @Test
     public void testSUMFinal() throws Exception {
-        String[] sumTypes = {"SUMFinal", "DoubleSumFinal", "LongSumFinal", "IntSumFinal", "FloatSumFinal"};
+        String[] sumTypes = {"SUMFinal", "DoubleSumFinal", "LongSumFinal", "IntSumFinal", "FloatSumFinal", "BigDecimalSumFinal", "BigIntegerSumFinal"};
         for(int k = 0; k < sumTypes.length; k++) {
             EvalFunc<?> sum = evalFuncMap.get(sumTypes[k]);
             String inputType = getInputType(sumTypes[k]);
@@ -1325,7 +1355,13 @@ public class TestBuiltin {
 
             if(inputType.equals("Integer") || inputType.equals("Long") || inputType.equals("IntegerAsLong")) {
               assertEquals(msg, (Long)output, (Long)getExpected(sumTypes[k]), 0.00001);
-            } else {
+            }
+            //Assert Equals does not support BigDecimal or BigInteger. Converting into String
+            else if(inputType == "BigDecimal")
+                assertEquals(msg, ((BigDecimal) output).toPlainString(), ((BigDecimal)getExpected(sumTypes[k])).toPlainString());
+            else if(inputType == "BigInteger")
+                assertEquals(msg, ((BigInteger) output).toString(), ((BigInteger)getExpected(sumTypes[k])).toString()); 
+            else {
               assertEquals(msg, (Double)output, (Double)getExpected(sumTypes[k]), 0.00001);
             }
         }

Modified: pig/branches/tez/test/org/apache/pig/test/TestEqualTo.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestEqualTo.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestEqualTo.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestEqualTo.java Sat Nov 16 00:26:55 2013
@@ -406,7 +406,7 @@ public class TestEqualTo {
         g.setRhs(rt);
 
         Result r = g.getNextBoolean();
-        assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+        assertEquals(POStatus.STATUS_OK, r.returnStatus);
         assertNull(r.result);
 
         // test with null in rhs
@@ -417,7 +417,7 @@ public class TestEqualTo {
         g.setRhs(rt);
 
         r = g.getNextBoolean();
-        assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+        assertEquals(POStatus.STATUS_OK, r.returnStatus);
         assertNull(r.result);
 
         // test with null in lhs and rhs
@@ -428,7 +428,7 @@ public class TestEqualTo {
         g.setRhs(rt);
 
         r = g.getNextBoolean();
-        assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+        assertEquals(POStatus.STATUS_OK, r.returnStatus);
         assertNull(r.result);
     }
 }

Modified: pig/branches/tez/test/org/apache/pig/test/TestGTOrEqual.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestGTOrEqual.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestGTOrEqual.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestGTOrEqual.java Sat Nov 16 00:26:55 2013
@@ -400,7 +400,7 @@ public class TestGTOrEqual {
         g.setRhs(rt);
 
         Result r = g.getNextBoolean();
-        assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+        assertEquals(POStatus.STATUS_OK, r.returnStatus);
         assertNull(r.result);
 
         // test with null in rhs
@@ -411,7 +411,7 @@ public class TestGTOrEqual {
         g.setRhs(rt);
 
         r = g.getNextBoolean();
-        assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+        assertEquals(POStatus.STATUS_OK, r.returnStatus);
         assertNull(r.result);
 
         // test with null in lhs and rhs
@@ -422,7 +422,7 @@ public class TestGTOrEqual {
         g.setRhs(rt);
 
         r = g.getNextBoolean();
-        assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+        assertEquals(POStatus.STATUS_OK, r.returnStatus);
         assertNull(r.result);
     }
 }

Modified: pig/branches/tez/test/org/apache/pig/test/TestGreaterThan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestGreaterThan.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestGreaterThan.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestGreaterThan.java Sat Nov 16 00:26:55 2013
@@ -397,7 +397,7 @@ public class TestGreaterThan {
         g.setRhs(rt);
 
         Result r = g.getNextBoolean();
-        assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+        assertEquals(POStatus.STATUS_OK, r.returnStatus);
         assertNull(r.result);
 
         // test with null in rhs
@@ -408,7 +408,7 @@ public class TestGreaterThan {
         g.setRhs(rt);
 
         r = g.getNextBoolean();
-        assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+        assertEquals(POStatus.STATUS_OK, r.returnStatus);
         assertNull(r.result);
 
         // test with null in lhs and rhs
@@ -419,7 +419,7 @@ public class TestGreaterThan {
         g.setRhs(rt);
 
         r = g.getNextBoolean();
-        assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+        assertEquals(POStatus.STATUS_OK, r.returnStatus);
         assertNull(r.result);
     }
 

Modified: pig/branches/tez/test/org/apache/pig/test/TestLTOrEqual.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestLTOrEqual.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestLTOrEqual.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestLTOrEqual.java Sat Nov 16 00:26:55 2013
@@ -394,7 +394,7 @@ public class TestLTOrEqual {
         g.setRhs(rt);
 
         Result r = g.getNextBoolean();
-        assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+        assertEquals(POStatus.STATUS_OK, r.returnStatus);
         assertNull((Boolean)r.result);
 
         // test with null in rhs
@@ -405,7 +405,7 @@ public class TestLTOrEqual {
         g.setRhs(rt);
 
         r = g.getNextBoolean();
-        assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+        assertEquals(POStatus.STATUS_OK, r.returnStatus);
         assertNull((Boolean)r.result);
 
         // test with null in lhs and rhs
@@ -416,7 +416,7 @@ public class TestLTOrEqual {
         g.setRhs(rt);
 
         r = g.getNextBoolean();
-        assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+        assertEquals(POStatus.STATUS_OK, r.returnStatus);
         assertNull((Boolean)r.result);
 
     }

Modified: pig/branches/tez/test/org/apache/pig/test/TestLessThan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestLessThan.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestLessThan.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestLessThan.java Sat Nov 16 00:26:55 2013
@@ -400,7 +400,7 @@ public class TestLessThan {
         g.setRhs(rt);
 
         Result r = g.getNextBoolean();
-        assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+        assertEquals(POStatus.STATUS_OK, r.returnStatus);
         assertNull(r.result);
 
         // test with null in rhs
@@ -411,7 +411,7 @@ public class TestLessThan {
         g.setRhs(rt);
 
         r = g.getNextBoolean();
-        assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+        assertEquals(POStatus.STATUS_OK, r.returnStatus);
         assertNull(r.result);
 
         // test with null in lhs and rhs
@@ -422,7 +422,7 @@ public class TestLessThan {
         g.setRhs(rt);
 
         r = g.getNextBoolean();
-        assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+        assertEquals(POStatus.STATUS_OK, r.returnStatus);
         assertNull(r.result);
     }
 }
\ No newline at end of file

Modified: pig/branches/tez/test/org/apache/pig/test/TestNotEqualTo.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestNotEqualTo.java?rev=1542433&r1=1542432&r2=1542433&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestNotEqualTo.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestNotEqualTo.java Sat Nov 16 00:26:55 2013
@@ -414,7 +414,7 @@ public class TestNotEqualTo {
         g.setRhs(rt);
 
         Result r = g.getNextBoolean();
-        assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+        assertEquals(POStatus.STATUS_OK, r.returnStatus);
         assertNull(r.result);
 
         // test with null in rhs
@@ -425,7 +425,7 @@ public class TestNotEqualTo {
         g.setRhs(rt);
 
         r = g.getNextBoolean();
-        assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+        assertEquals(POStatus.STATUS_OK, r.returnStatus);
         assertNull(r.result);
 
         // test with null in lhs and rhs
@@ -436,7 +436,7 @@ public class TestNotEqualTo {
         g.setRhs(rt);
 
         r = g.getNextBoolean();
-        assertEquals(POStatus.STATUS_NULL, r.returnStatus);
+        assertEquals(POStatus.STATUS_OK, r.returnStatus);
         assertNull(r.result);
     }
 }

Propchange: pig/branches/tez/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
------------------------------------------------------------------------------
  Merged /pig/trunk/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2:r1541676-1542432