You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2010/12/23 02:33:45 UTC

svn commit: r1052127 [1/3] - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ src/org/apache/pig/backend/hadoop/executionengine/physi...

Author: dvryaboy
Date: Thu Dec 23 01:33:44 2010
New Revision: 1052127

URL: http://svn.apache.org/viewvc?rev=1052127&view=rev
Log:
PIG-1755: Clean up duplicated code in PhysicalOperators

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Add.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Mod.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Multiply.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
    pig/trunk/src/org/apache/pig/data/DataType.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Dec 23 01:33:44 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1755: Clean up duplicated code in PhysicalOperators (dvryaboy)
+
 PIG-750: Use combiner when algebraic UDFs are used in expressions (thejas)
 
 PIG-490: Combiner not used when group elements referred to in 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Thu Dec 23 01:33:44 2010
@@ -39,18 +39,18 @@ import org.apache.pig.pen.Illustrator;
 import org.apache.pig.pen.Illustrable;
 
 /**
- * 
+ *
  * This is the base class for all operators. This supports a generic way of
  * processing inputs which can be overridden by operators extending this class.
  * The input model assumes that it can either be taken from an operator or can
  * be attached directly to this operator. Also it is assumed that inputs to an
  * operator are always in the form of a tuple.
- * 
+ *
  * For this pipeline rework, we assume a pull based model, i.e, the root
  * operator is going to call getNext with the appropriate type which initiates a
  * cascade of getNext calls that unroll to create input for the root operator to
  * work on.
- * 
+ *
  * Any operator that extends the PhysicalOperator, supports a getNext with all
  * the different types of parameter types. The concrete implementation should
  * use the result type of its input operator to decide the type of getNext's
@@ -62,7 +62,7 @@ import org.apache.pig.pen.Illustrable;
  */
 public abstract class PhysicalOperator extends Operator<PhyPlanVisitor> implements Illustrable, Cloneable {
 
-    private Log log = LogFactory.getLog(getClass());
+    private final Log log = LogFactory.getLog(getClass());
 
     protected static final long serialVersionUID = 1L;
 
@@ -81,7 +81,7 @@ public abstract class PhysicalOperator e
 
     // The physical plan this operator is part of
     protected PhysicalPlan parentPlan;
-    
+
     // Specifies if the input has been directly attached
     protected boolean inputAttached = false;
 
@@ -90,11 +90,11 @@ public abstract class PhysicalOperator e
 
     // The result of performing the operation along with the output
     protected Result res = null;
-    
-    
+
+
     // alias associated with this PhysicalOperator
     protected String alias = null;
-    
+
     // Will be used by operators to report status or transmit heartbeat
     // Should be set by the backends to appropriate implementations that
     // wrap their own version of a reporter.
@@ -126,7 +126,7 @@ public abstract class PhysicalOperator e
     static final protected DataBag dummyBag = null;
 
     static final protected Map dummyMap = null;
-    
+
     // TODO: This is not needed. But a lot of tests check serialized physical plans
     // that are sensitive to the serialized image of the contained physical operators.
     // So for now, just keep it. Later it'll be cleansed along with those test golden
@@ -161,11 +161,11 @@ public abstract class PhysicalOperator e
     public void setIllustrator(Illustrator illustrator) {
 	      this.illustrator = illustrator;
     }
-    
+
     public Illustrator getIllustrator() {
         return illustrator;
     }
-    
+
     public int getRequestedParallelism() {
         return requestedParallelism;
     }
@@ -181,16 +181,16 @@ public abstract class PhysicalOperator e
     public String getAlias() {
         return alias;
     }
-    
+
     protected String getAliasString() {
         return (alias == null) ? "" : (alias + ": ");
     }
-    
+
     public void setAlias(String alias) {
         this.alias = alias;
     }
-    
-    public void setAccumulative() {    	
+
+    public void setAccumulative() {
         accum = true;
     }
 
@@ -208,7 +208,7 @@ public abstract class PhysicalOperator e
     public boolean isAccumStarted() {
     	return accumStart;
     }
-    
+
     public void setAccumEnd() {
        if (!accum){
     	   throw new IllegalStateException("Accumulative is not turned on.");
@@ -235,7 +235,7 @@ public abstract class PhysicalOperator e
     /**
      * Shorts the input path of this operator by providing the input tuple
      * directly
-     * 
+     *
      * @param t -
      *            The tuple that should be used as input
      */
@@ -246,7 +246,7 @@ public abstract class PhysicalOperator e
 
     /**
      * Detaches any tuples that are attached
-     * 
+     *
      */
     public void detachInput() {
         input = null;
@@ -258,7 +258,7 @@ public abstract class PhysicalOperator e
      * operators are those that need the full bag before operate on the tuples
      * inside the bag. Example is the Global Rearrange. Non-blocking or pipeline
      * operators are those that work on a tuple by tuple basis.
-     * 
+     *
      * @return true if blocking and false otherwise
      */
     public boolean isBlocking() {
@@ -269,22 +269,24 @@ public abstract class PhysicalOperator e
      * A generic method for parsing input that either returns the attached input
      * if it exists or fetches it from its predecessor. If special processing is
      * required, this method should be overridden.
-     * 
+     *
      * @return The Result object that results from processing the input
      * @throws ExecException
      */
     public Result processInput() throws ExecException {
-        
+
         Result res = new Result();
         if (input == null && (inputs == null || inputs.size()==0)) {
 //            log.warn("No inputs found. Signaling End of Processing.");
             res.returnStatus = POStatus.STATUS_EOP;
             return res;
         }
-        
+
         //Should be removed once the model is clear
-        if(reporter!=null) reporter.progress();
-            
+        if(reporter!=null) {
+            reporter.progress();
+        }
+
         if (!isInputAttached()) {
             return inputs.get(0).getNext(dummyTuple);
         } else {
@@ -295,8 +297,74 @@ public abstract class PhysicalOperator e
         }
     }
 
+    @Override
     public abstract void visit(PhyPlanVisitor v) throws VisitorException;
 
+    /**
+     * Implementations that call into the different versions of getNext are often
+     * identical, differing only in the signature of the getNext() call they make.
+     * This method allows to cut down on some of the copy-and-paste.
+     *
+     * @param obj The object we are working with. Its class should correspond to DataType
+     * @param dataType Describes the type of obj; a byte from DataType.
+     * @return result Result of applying this Operator to the Object.
+     * @throws ExecException
+     */
+    @SuppressWarnings("rawtypes")  // For legacy use of untemplatized Map.
+    public Result getNext(Object obj, byte dataType) throws ExecException {
+        switch (dataType) {
+        case DataType.BAG:
+            return getNext((DataBag) obj);
+        case DataType.BOOLEAN:
+            return getNext((Boolean) obj);
+        case DataType.BYTEARRAY:
+            return getNext((DataByteArray) obj);
+        case DataType.CHARARRAY:
+            return getNext((String) obj);
+        case DataType.DOUBLE:
+            return getNext((Double) obj);
+        case DataType.FLOAT:
+            return getNext((Float) obj);
+        case DataType.INTEGER:
+            return getNext((Integer) obj);
+        case DataType.LONG:
+            return getNext((Long) obj);
+        case DataType.MAP:
+            return getNext((Map) obj);
+        case DataType.TUPLE:
+            return getNext((Tuple) obj);
+        default:
+            throw new ExecException("Unsupported type for getNext: " + DataType.findTypeName(dataType));
+        }
+    }
+
+    public static Object getDummy(byte dataType) throws ExecException {
+        switch (dataType) {
+        case DataType.BAG:
+            return dummyBag;
+        case DataType.BOOLEAN:
+            return dummyBool;
+        case DataType.BYTEARRAY:
+            return dummyDBA;
+        case DataType.CHARARRAY:
+            return dummyString;
+        case DataType.DOUBLE:
+            return dummyDouble;
+        case DataType.FLOAT:
+            return dummyFloat;
+        case DataType.INTEGER:
+            return dummyFloat;
+        case DataType.LONG:
+            return dummyLong;
+        case DataType.MAP:
+            return dummyMap;
+        case DataType.TUPLE:
+            return dummyTuple;
+        default:
+            throw new ExecException("Unsupported type for getDummy: " + DataType.findTypeName(dataType));
+        }
+    }
+
     public Result getNext(Integer i) throws ExecException {
         return res;
     }
@@ -337,7 +405,9 @@ public abstract class PhysicalOperator e
         Result ret = null;
         DataBag tmpBag = BagFactory.getInstance().newDefaultBag();
         for(ret = getNext(dummyTuple);ret.returnStatus!=POStatus.STATUS_EOP;ret=getNext(dummyTuple)){
-            if(ret.returnStatus == POStatus.STATUS_ERR) return ret;
+            if(ret.returnStatus == POStatus.STATUS_ERR) {
+                return ret;
+            }
             tmpBag.add((Tuple)ret.result);
         }
         ret.result = tmpBag;
@@ -361,7 +431,7 @@ public abstract class PhysicalOperator e
     }
 
     /**
-     * Make a deep copy of this operator. This function is blank, however, 
+     * Make a deep copy of this operator. This function is blank, however,
      * we should leave a place holder so that the subclasses can clone
      * @throws CloneNotSupportedException
      */
@@ -380,15 +450,15 @@ public abstract class PhysicalOperator e
     public void setParentPlan(PhysicalPlan physicalPlan) {
        parentPlan = physicalPlan;
     }
-    
+
     public Log getLogger() {
     	return log;
     }
-    
+
     public static void setPigLogger(PigLogger logger) {
     	pigLogger = logger;
     }
-    
+
     public static PigLogger getPigLogger() {
     	return pigLogger;
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Add.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Add.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Add.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Add.java Thu Dec 23 01:33:44 2010
@@ -29,7 +29,7 @@ import org.apache.pig.impl.plan.VisitorE
 public class Add extends BinaryExpressionOperator {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
 
@@ -51,125 +51,81 @@ public class Add extends BinaryExpressio
         return "Add" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
-    @Override
-    public Result getNext(Double d) throws ExecException {
-        Result r = accumChild(null, d);
+    /*
+     * This method is used to invoke the appropriate addition method, as Java does not provide generic
+     * dispatch for it.
+     */
+    @SuppressWarnings("unchecked")
+    protected <T extends Number> T add(T a, T b, byte dataType) throws ExecException {
+        switch(dataType) {
+        case DataType.DOUBLE:
+            return (T) Double.valueOf((Double) a + (Double) b);
+        case DataType.INTEGER:
+            return (T) Integer.valueOf((Integer) a + (Integer) b);
+        case DataType.LONG:
+            return (T) Long.valueOf((Long) a + (Long) b);
+        case DataType.FLOAT:
+            return (T) Float.valueOf((Float) a + (Float) b);
+        default:
+            throw new ExecException("called on unsupported Number class " + DataType.findTypeName(dataType));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <T extends Number> Result genericGetNext(T number, byte dataType) throws ExecException {
+        Result r = accumChild(null, number, dataType);
         if (r != null) {
             return r;
         }
-        
+
         byte status;
         Result res;
-        Double left = null, right = null;
-        res = lhs.getNext(left);
+        T left = null, right = null;
+        res = lhs.getNext(left, dataType);
         status = res.returnStatus;
         if(status != POStatus.STATUS_OK || res.result == null) {
             return res;
         }
-        left = (Double) res.result;
-        
-        res = rhs.getNext(right);
+        left = (T) res.result;
+
+        res = rhs.getNext(right, dataType);
         status = res.returnStatus;
         if(status != POStatus.STATUS_OK || res.result == null) {
             return res;
         }
-        right = (Double) res.result;
-        
-        res.result = new Double(left + right);
+        right = (T) res.result;
+
+        res.result = add(left, right, dataType);
         return res;
     }
-    
+
+    @Override
+    public Result getNext(Double d) throws ExecException {
+        return genericGetNext(d, DataType.DOUBLE);
+    }
+
     @Override
     public Result getNext(Float f) throws ExecException {
-        Result r = accumChild(null, f);
-        if (r != null) {
-            return r;
-        }
-        
-        byte status;
-        Result res;
-        Float left = null, right = null;
-        res = lhs.getNext(left);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        left = (Float) res.result;
-        
-        res = rhs.getNext(right);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        right = (Float) res.result;
-        
-        res.result = new Float(left + right);
-        return res;
+        return genericGetNext(f, DataType.FLOAT);
     }
-    
+
     @Override
-    public Result getNext(Integer i) throws ExecException {    	
-        Result r = accumChild(null, i);
-        if (r != null) {
-            return r;
-        }
-        
-        byte status;
-        Result res;
-        Integer left = null, right = null;
-        res = lhs.getNext(left);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        left = (Integer) res.result;
-        
-        res = rhs.getNext(right);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        right = (Integer) res.result;
-        
-        res.result = Integer.valueOf(left + right);
-        return res;
+    public Result getNext(Integer i) throws ExecException {
+        return genericGetNext(i, DataType.INTEGER);
     }
-    
+
     @Override
     public Result getNext(Long l) throws ExecException {
-        Result r = accumChild(null, l);
-        if (r != null) {
-            return r;
-        }
-        
-        byte status;
-        Result res;
-        Long left = null, right = null;
-        res = lhs.getNext(left);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        left = (Long) res.result;
-        
-        res = rhs.getNext(right);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        right = (Long) res.result;
-        
-        res.result = Long.valueOf(left + right);
-        return res;
+        return genericGetNext(l, DataType.LONG);
     }
 
     @Override
     public Add clone() throws CloneNotSupportedException {
-        Add clone = new Add(new OperatorKey(mKey.scope, 
+        Add clone = new Add(new OperatorKey(mKey.scope,
             NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)));
         clone.cloneHelper(this);
         return clone;
     }
-    
+
 
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java Thu Dec 23 01:33:44 2010
@@ -26,6 +26,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -35,21 +36,21 @@ import org.apache.pig.impl.plan.VisitorE
 
 /**
  * This class implements a Constant of any type.
- * Its value can be set using the setValue method. 
+ * Its value can be set using the setValue method.
  *
  */
 public class ConstantExpression extends ExpressionOperator {
-    
+
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
 
 //    private Log log = LogFactory.getLog(getClass());
-    
+
     //The value that this constant represents
     Object value;
-    
+
     //The result of calling getNext
     Result res = new Result();
 
@@ -63,10 +64,11 @@ public class ConstantExpression extends 
 
     @Override
     public String name() {
-        if(value!=null)
+        if(value!=null) {
             return "Constant(" + value.toString() +") - " + mKey.toString();
-        else
+        } else {
             return "Constant(" + "DummyVal" +") - " + mKey.toString();
+        }
     }
 
     @Override
@@ -94,114 +96,86 @@ public class ConstantExpression extends 
         attachInput(dummyTuple);
     }
 
-    @Override
-    public Result getNext(DataBag db) throws ExecException {
+    protected Result genericGetNext(Object obj, byte dataType) throws ExecException {
         res = processInput();
-        if(res.returnStatus!=POStatus.STATUS_OK)
+        if(res.returnStatus != POStatus.STATUS_OK) {
             return res;
-        
-        res.result = (DataBag)value;
+        }
+        res.result = value;
         return res;
     }
 
     @Override
+    public Result getNext(DataBag db) throws ExecException {
+        return genericGetNext(db, DataType.BAG);
+    }
+
+    @Override
     public Result getNext(DataByteArray ba) throws ExecException {
-        res = processInput();
-        if(res.returnStatus!=POStatus.STATUS_OK)
-            return res;
-        res.result = (DataByteArray)value;
-        return res;
+        return genericGetNext(ba, DataType.BYTEARRAY);
     }
 
     @Override
     public Result getNext(Double d) throws ExecException {
-        res = processInput();
-        if(res.returnStatus!=POStatus.STATUS_OK)
-            return res;
-        res.result = (Double)value;
-        return res;
+        return genericGetNext(d, DataType.DOUBLE);
     }
 
     @Override
     public Result getNext(Float f) throws ExecException {
-        res = processInput();
-        if(res.returnStatus!=POStatus.STATUS_OK)
-            return res;
-        res.result = (Float)value;
-        return res;
+        return genericGetNext(f, DataType.FLOAT);
+
     }
 
     @Override
     public Result getNext(Integer i) throws ExecException {
-        res = processInput();
-        if(res.returnStatus!=POStatus.STATUS_OK)
-            return res;
-        res.result = (Integer)value;
-        return res;
+        return genericGetNext(i, DataType.INTEGER);
+
     }
 
     @Override
     public Result getNext(Long l) throws ExecException {
-        res = processInput();
-        if(res.returnStatus!=POStatus.STATUS_OK)
-            return res;
-        res.result = (Long)value;
-        return res;
+        return genericGetNext(l, DataType.LONG);
+
     }
 
     @Override
     public Result getNext(String s) throws ExecException {
-        res = processInput();
-        if(res.returnStatus!=POStatus.STATUS_OK)
-            return res;
-        res.result = (String)value;
-        return res;
+        return genericGetNext(s, DataType.CHARARRAY);
+
     }
 
     @Override
     public Result getNext(Tuple t) throws ExecException {
-        res = processInput();
-        if(res.returnStatus!=POStatus.STATUS_OK)
-            return res;
-        res.result = (Tuple)value;
-        return res;
+        return genericGetNext(t, DataType.TUPLE);
+
     }
-    
-    
-    
+
     @Override
     public Result getNext(Boolean b) throws ExecException {
-        res = processInput();
-        if(res.returnStatus!=POStatus.STATUS_OK)
-            return res;
-        res.result = (Boolean)value;
-        return res;
+        return genericGetNext(b, DataType.BOOLEAN);
     }
 
     @Override
     public Result getNext(Map m) throws ExecException {
-        res = processInput();
-        if(res.returnStatus!=POStatus.STATUS_OK)
-            return res;
-        res.result = (Map)value;
-        return res;
+        return genericGetNext(m, DataType.MAP);
+
     }
 
     @Override
     public ConstantExpression clone() throws CloneNotSupportedException {
         ConstantExpression clone =
-            new ConstantExpression(new OperatorKey(mKey.scope, 
+            new ConstantExpression(new OperatorKey(mKey.scope,
             NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)));
         clone.value = value;
         clone.cloneHelper(this);
         return clone;
     }
-    
+
     /**
      * Get the child expressions of this expression
      */
     @Override
-    public List<ExpressionOperator> getChildExpressions() {		
+    public List<ExpressionOperator> getChildExpressions() {
         return null;
     }
     

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java Thu Dec 23 01:33:44 2010
@@ -30,7 +30,7 @@ import org.apache.pig.impl.plan.VisitorE
 public class Divide extends BinaryExpressionOperator {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
 
@@ -52,153 +52,109 @@ public class Divide extends BinaryExpres
         return "Divide" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
-    @Override
-    public Result getNext(Double d) throws ExecException {
-        Result r = accumChild(null, d);
-        if (r != null) {
-            return r;
-        }
-        
-        byte status;
-        Result res;
-        Double left = null, right = null;
-        res = lhs.getNext(left);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        left = (Double) res.result;
-        
-        res = rhs.getNext(right);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
+    /*
+     * This method is used to invoke the appropriate method, as Java does not provide generic
+     * dispatch for it.
+     */
+    @SuppressWarnings("unchecked")
+    protected <T extends Number> T divide(T a, T b, byte dataType) throws ExecException {
+        switch (dataType) {
+        case DataType.DOUBLE:
+            return (T) Double.valueOf((Double) a / (Double) b);
+        case DataType.INTEGER:
+            return (T) Integer.valueOf((Integer) a / (Integer) b);
+        case DataType.LONG:
+            return (T) Long.valueOf((Long) a / (Long) b);
+        case DataType.FLOAT:
+            return (T) Float.valueOf((Float) a / (Float) b);
+        default:
+            throw new ExecException("called on unsupported Number class " + DataType.findTypeName(dataType));
         }
-        right = (Double) res.result;
-        
-        if (right == 0) {
-            if(pigLogger != null) {
-                pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
-            }
-            res.result = null;
+    }
+
+    /*
+     * This method is used to invoke the appropriate method, as Java does not provide generic
+     * dispatch for it.
+     */
+    protected <T extends Number> boolean equalsZero(T a, byte dataType) throws ExecException {
+        switch (dataType) {
+        case DataType.DOUBLE:
+            return ((Double) a).equals(0.0);
+        case DataType.INTEGER:
+            return ((Integer) a).equals(0);
+        case DataType.LONG:
+            return ((Long) a).equals(0L);
+        case DataType.FLOAT:
+            return ((Float) a).equals(0.0f);
+        default:
+            throw new ExecException("Called on unsupported Number class " + DataType.findTypeName(dataType));
         }
-        else
-            res.result = new Double(left / right);
-        return res;
     }
-    
-    @Override
-    public Result getNext(Float f) throws ExecException {
-        Result r = accumChild(null, f);
+
+    @SuppressWarnings("unchecked")
+    protected <T extends Number> Result genericGetNext(T number, byte dataType) throws ExecException {
+        Result r = accumChild(null, number, dataType);
         if (r != null) {
             return r;
         }
-        
+
+
         byte status;
         Result res;
-        Float left = null, right = null;
-        res = lhs.getNext(left);
+        T left = null, right = null;
+        res = lhs.getNext(left, dataType);
         status = res.returnStatus;
         if(status != POStatus.STATUS_OK || res.result == null) {
             return res;
         }
-        left = (Float) res.result;
-        
-        res = rhs.getNext(right);
+        left = (T) res.result;
+
+        res = rhs.getNext(right, dataType);
         status = res.returnStatus;
         if(status != POStatus.STATUS_OK || res.result == null) {
             return res;
         }
-        right = (Float) res.result;
-        
-        if (right == 0) {
+        right = (T) res.result;
+
+        if (equalsZero(right, dataType)) {
             if(pigLogger != null) {
                 pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
             }
             res.result = null;
+        } else {
+            res.result = divide(left, right, dataType);
         }
-        else
-            res.result = new Float(left / right);
         return res;
     }
-    
+
+    @Override
+    public Result getNext(Double d) throws ExecException {
+        return genericGetNext(d, DataType.DOUBLE);
+    }
+
+    @Override
+    public Result getNext(Float f) throws ExecException {
+        return genericGetNext(f, DataType.FLOAT);
+
+    }
+
     @Override
     public Result getNext(Integer i) throws ExecException {
-        Result r = accumChild(null, i);
-        if (r != null) {
-            return r;
-        }
-        
-        byte status;
-        Result res;
-        Integer left = null, right = null;
-        res = lhs.getNext(left);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        left = (Integer) res.result;
-        
-        res = rhs.getNext(right);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        right = (Integer) res.result;
-        
-        if (right == 0) {
-            if(pigLogger != null) {
-                pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
-            }
-            res.result = null;
-        }
-        else
-            res.result = Integer.valueOf(left / right);
-        return res;
+        return genericGetNext(i, DataType.INTEGER);
     }
-    
+
     @Override
     public Result getNext(Long l) throws ExecException {
-        Result r = accumChild(null, l);
-        if (r != null) {
-            return r;
-        }
-        
-        byte status;
-        Result res;
-        Long left = null, right = null;
-        res = lhs.getNext(left);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        left = (Long) res.result;
-        
-        res = rhs.getNext(right);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        right = (Long) res.result;
-        
-        if (right == 0) {
-            if(pigLogger != null) {
-                pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
-            }
-            res.result = null;
-        }
-        else
-            res.result = Long.valueOf(left / right);
-        return res;
+        return genericGetNext(l, DataType.LONG);
     }
 
     @Override
     public Divide clone() throws CloneNotSupportedException {
-        Divide clone = new Divide(new OperatorKey(mKey.scope, 
+        Divide clone = new Divide(new OperatorKey(mKey.scope,
             NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)));
         clone.cloneHelper(this);
         return clone;
     }
-    
+
 
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java Thu Dec 23 01:33:44 2010
@@ -27,7 +27,6 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -36,7 +35,7 @@ import org.apache.pig.impl.plan.VisitorE
 public class EqualToExpr extends BinaryComparisonOperator {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
     transient private final Log log = LogFactory.getLog(getClass());
@@ -62,105 +61,48 @@ public class EqualToExpr extends BinaryC
 
     @Override
     public Result getNext(Boolean bool) throws ExecException {
-        byte status;
         Result left, right;
 
         switch (operandType) {
-        case DataType.BYTEARRAY: {
-            Result r = accumChild(null, dummyDBA);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyDBA);
-            right = rhs.getNext(dummyDBA);
-            return doComparison(left, right);
-                            }
-
-        case DataType.DOUBLE: {
-            Result r = accumChild(null, dummyDouble);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyDouble);
-            right = rhs.getNext(dummyDouble);
-            return doComparison(left, right);
-                            }
-
-        case DataType.FLOAT: {
-            Result r = accumChild(null, dummyFloat);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyFloat);
-            right = rhs.getNext(dummyFloat);
-            return doComparison(left, right);
-                            }
-
-        case DataType.INTEGER: {
-            Result r = accumChild(null, dummyInt);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyInt);
-            right = rhs.getNext(dummyInt);
-            return doComparison(left, right);
-                            }
-
-        case DataType.LONG: {
-            Result r = accumChild(null, dummyLong);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyLong);
-            right = rhs.getNext(dummyLong);
-            return doComparison(left, right);
-                            }
-
-        case DataType.CHARARRAY: {
-            Result r = accumChild(null, dummyString);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyString);
-            right = rhs.getNext(dummyString);
-            return doComparison(left, right);
-                            }
-
-        case DataType.TUPLE: {
-            Result r = accumChild(null, dummyTuple);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyTuple);
-            right = rhs.getNext(dummyTuple);
-            return doComparison(left, right);
-                            }
-        
+        case DataType.BYTEARRAY:
+        case DataType.DOUBLE:
+        case DataType.FLOAT:
+        case DataType.INTEGER:
+        case DataType.LONG:
+        case DataType.CHARARRAY:
+        case DataType.TUPLE:
         case DataType.MAP: {
-            Result r = accumChild(null, dummyMap);
+            Object dummy = getDummy(operandType);
+            Result r = accumChild(null, dummy, operandType);
             if (r != null) {
                 return r;
             }
-            left = lhs.getNext(dummyMap);
-            right = rhs.getNext(dummyMap);
+            left = lhs.getNext(dummy, operandType);
+            right = rhs.getNext(dummy, operandType);
             return doComparison(left, right);
-                            }
-            
+        }
+
         default: {
             int errCode = 2067;
             String msg = this.getClass().getSimpleName() + " does not know how to " +
             "handle type: " + DataType.findTypeName(operandType);
             throw new ExecException(msg, errCode, PigException.BUG);
         }
-        
+
         }
     }
 
     @SuppressWarnings("unchecked")
     private Result doComparison(Result left, Result right) throws ExecException {
-        if (trueRef == null) initializeRefs();
-        if (left.returnStatus != POStatus.STATUS_OK) return left;
-        if (right.returnStatus != POStatus.STATUS_OK) return right;
+        if (trueRef == null) {
+            initializeRefs();
+        }
+        if (left.returnStatus != POStatus.STATUS_OK) {
+            return left;
+        }
+        if (right.returnStatus != POStatus.STATUS_OK) {
+            return right;
+        }
         // if either operand is null, the result should be
         // null
         if(left.result == null || right.result == null) {
@@ -168,9 +110,9 @@ public class EqualToExpr extends BinaryC
             left.returnStatus = POStatus.STATUS_NULL;
             return left;
         }
-        
+
         if (left.result instanceof Comparable && right.result instanceof Comparable){
-            if (((Comparable)left.result).compareTo((Comparable)right.result) == 0) {
+            if (((Comparable)left.result).compareTo(right.result) == 0) {
                 left.result = trueRef;
             } else {
                 left.result = falseRef;
@@ -178,10 +120,11 @@ public class EqualToExpr extends BinaryC
         }else if (left.result instanceof HashMap && right.result instanceof HashMap){
             HashMap leftMap=(HashMap)left.result;
             HashMap rightMap=(HashMap)right.result;
-            if (leftMap.equals(rightMap))
+            if (leftMap.equals(rightMap)) {
                 left.result = trueRef;
-            else
+            } else {
                 left.result = falseRef;
+            }
         }else{
             throw new ExecException("The left side and right side has the different types");
         }
@@ -191,7 +134,7 @@ public class EqualToExpr extends BinaryC
 
     @Override
     public EqualToExpr clone() throws CloneNotSupportedException {
-        EqualToExpr clone = new EqualToExpr(new OperatorKey(mKey.scope, 
+        EqualToExpr clone = new EqualToExpr(new OperatorKey(mKey.scope,
             NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)));
         clone.cloneHelper(this);
         return clone;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java Thu Dec 23 01:33:44 2010
@@ -27,10 +27,10 @@ import org.apache.commons.logging.LogFac
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -46,7 +46,7 @@ import org.apache.pig.pen.Illustrator;
 public abstract class ExpressionOperator extends PhysicalOperator {
     private static final long serialVersionUID = 1L;
     protected Log log = LogFactory.getLog(getClass());
-    
+
     public ExpressionOperator(OperatorKey k) {
         this(k,-1);
     }
@@ -54,7 +54,7 @@ public abstract class ExpressionOperator
     public ExpressionOperator(OperatorKey k, int rp) {
         super(k, rp);
     }
-    
+
     @Override
     public void setIllustrator(Illustrator illustrator) {
         this.illustrator = illustrator;
@@ -64,14 +64,15 @@ public abstract class ExpressionOperator
     public boolean supportsMultipleOutputs() {
         return false;
     }
-    
+
     @Override
     public Result getNext(DataBag db) throws ExecException {
         return new Result();
     }
-    
+
+    @Override
     public abstract void visit(PhyPlanVisitor v) throws VisitorException;
-    
+
 
     /**
      * Make a deep copy of this operator.  This is declared here to make it
@@ -91,16 +92,16 @@ public abstract class ExpressionOperator
      * expression must be called if they have any UDF to drive the UDF.accumulate()
      */
     protected abstract List<ExpressionOperator> getChildExpressions();
-    
+
     /** check whether this expression contains any UDF
      * this is called if reducer is run as accumulative mode
-     * in this case, all UDFs must be called 
+     * in this case, all UDFs must be called
      */
     public boolean containUDF() {
         if (this instanceof POUserFunc) {
             return true;
         }
-        
+
         List<ExpressionOperator> l = getChildExpressions();
         if (l != null) {
             for(ExpressionOperator e: l) {
@@ -109,297 +110,110 @@ public abstract class ExpressionOperator
                 }
             }
         }
-        
+
         return false;
     }
 
+
     /**
      * Drive all the UDFs in accumulative mode
      */
-    protected Result accumChild(List<ExpressionOperator> child, Double d) throws ExecException {
+    protected Result accumChild(List<ExpressionOperator> child, Object o, byte dataType) throws ExecException {
         if (isAccumStarted()) {
             if (child == null) {
                 child = getChildExpressions();
             }
             Result res = null;
-            if (child != null) {        		
-                for(ExpressionOperator e: child) {        			
-                    if (e.containUDF()) {    
-                        res = e.getNext(d);
+            if (child != null) {
+                for(ExpressionOperator e: child) {
+                    if (e.containUDF()) {
+                        res = e.getNext(o, dataType);
                         if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
                             return res;
                         }
                     }
                 }
             }
-            
+
             res = new Result();
             res.returnStatus = POStatus.STATUS_BATCH_OK;
-            
+
             return res;
         }
-        
+
         return null;
     }
-       
+
     /**
      * Drive all the UDFs in accumulative mode
      */
-    protected Result accumChild(List<ExpressionOperator> child, Integer v) throws ExecException {    	
-        if (isAccumStarted()) {    		
-            if (child == null) {
-                child = getChildExpressions();
-            }
-            Result res = null;
-            if (child != null) {        		
-                for(ExpressionOperator e: child) {        			
-                    if (e.containUDF()) {            				
-                        res = e.getNext(v);
-                        if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
-                            return res;
-                        }
-                    }
-                }
-            }
-            
-            res = new Result();
-            res.returnStatus = POStatus.STATUS_BATCH_OK;
-            
-            return res;
-        }
-        
-        return null;
+    protected Result accumChild(List<ExpressionOperator> child, Double d) throws ExecException {
+        return accumChild(child, d, DataType.DOUBLE);
     }
-       
+
+    /**
+     * Drive all the UDFs in accumulative mode
+     */
+    protected Result accumChild(List<ExpressionOperator> child, Integer v) throws ExecException {
+        return accumChild(child, v, DataType.INTEGER);
+    }
+
+
     /**
      * Drive all the UDFs in accumulative mode
      */
     protected Result accumChild(List<ExpressionOperator> child, Long l) throws ExecException {
-        if (isAccumStarted()) {
-            if (child == null) {
-                child = getChildExpressions();
-            }
-            Result res = null;
-            if (child != null) {        		
-                for(ExpressionOperator e: child) {        			
-                    if (e.containUDF()) {    
-                        res = e.getNext(l);
-                        if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
-                            return res;
-                        }
-                    }
-                }
-            }
-            
-            res = new Result();
-            res.returnStatus = POStatus.STATUS_BATCH_OK;
-            
-            return res;
-        }
-        
-        return null;
+        return accumChild(child, l, DataType.LONG);
     }
-    
+
     /**
      * Drive all the UDFs in accumulative mode
      */
     protected Result accumChild(List<ExpressionOperator> child, Float f) throws ExecException {
-        if (isAccumStarted()) {
-            if (child == null) {
-                child = getChildExpressions();
-            }
-            Result res = null;
-            if (child != null) {        		
-                for(ExpressionOperator e: child) {        			
-                    if (e.containUDF()) {    
-                        res = e.getNext(f);
-                        if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
-                            return res;
-                        }
-                    }
-                }
-            }
-            
-            res = new Result();
-            res.returnStatus = POStatus.STATUS_BATCH_OK;
-            
-            return res;
-        }
-        
-        return null;
+        return accumChild(child, f, DataType.FLOAT);
     }
-       
+
     /**
      * Drive all the UDFs in accumulative mode
      */
     protected Result accumChild(List<ExpressionOperator> child, Boolean b) throws ExecException {
-        if (isAccumStarted()) {
-            if (child == null) {
-                child = getChildExpressions();
-            }
-            Result res = null;
-            if (child != null) {        		
-                for(ExpressionOperator e: child) {           			
-                    if (e.containUDF()) {    
-                        res = e.getNext(b);
-                        if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
-                            return res;
-                        }
-                    }
-                }
-            }
-            
-            res = new Result();
-            res.returnStatus = POStatus.STATUS_BATCH_OK;
-            
-            return res;
-        }
-        
-        return null;
+        return accumChild(child, b, DataType.BOOLEAN);
+
     }
-    
+
     /**
      * Drive all the UDFs in accumulative mode
      */
     protected Result accumChild(List<ExpressionOperator> child, String s) throws ExecException {
-        if (isAccumStarted()) {
-            if (child == null) {
-                child = getChildExpressions();
-            }
-            Result res = null;
-            if (child != null) {        		
-                for(ExpressionOperator e: child) {        			
-                    if (e.containUDF()) {    
-                        res = e.getNext(s);
-                        if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
-                            return res;
-                        }
-                    }
-                }
-            }
-            
-            res = new Result();
-            res.returnStatus = POStatus.STATUS_BATCH_OK;
-            
-            return res;
-        }
-        
-        return null;
+        return accumChild(child, s, DataType.CHARARRAY);
+
     }
-       
+
     /**
      * Drive all the UDFs in accumulative mode
      */
     protected Result accumChild(List<ExpressionOperator> child, DataByteArray dba) throws ExecException {
-        if (isAccumStarted()) {
-            if (child == null) {
-                child = getChildExpressions();
-            }
-            Result res = null;
-            if (child != null) {        		
-                for(ExpressionOperator e: child) {        			
-                    if (e.containUDF()) {    
-                        res = e.getNext(dba);
-                        if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
-                            return res;
-                        }
-                    }
-                }
-            }
-            
-            res = new Result();
-            res.returnStatus = POStatus.STATUS_BATCH_OK;
-            
-            return res;
-        }
-        
-        return null;
+        return accumChild(child, dba, DataType.BYTEARRAY);
     }
-    
+
     /**
      * Drive all the UDFs in accumulative mode
      */
     protected Result accumChild(List<ExpressionOperator> child, Map map) throws ExecException {
-        if (isAccumStarted()) {
-            if (child == null) {
-                child = getChildExpressions();
-            }
-            Result res = null;
-            if (child != null) {        		
-                for(ExpressionOperator e: child) {        			
-                    if (e.containUDF()) {    
-                        res = e.getNext(map);
-                        if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
-                            return res;
-                        }
-                    }
-                }
-            }
-            
-            res = new Result();
-            res.returnStatus = POStatus.STATUS_BATCH_OK;
-            
-            return res;
-        }
-        
-        return null;
+        return accumChild(child, map, DataType.MAP);
     }
-    
+
     /**
      * Drive all the UDFs in accumulative mode
      */
     protected Result accumChild(List<ExpressionOperator> child, Tuple t) throws ExecException {
-        if (isAccumStarted()) {
-            if (child == null) {
-                child = getChildExpressions();
-            }
-            Result res = null;
-            if (child != null) {        		
-                for(ExpressionOperator e: child) {        			
-                    if (e.containUDF()) {    
-                        res = e.getNext(t);
-                        if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
-                            return res;
-                        }
-                    }
-                }
-            }
-            
-            res = new Result();
-            res.returnStatus = POStatus.STATUS_BATCH_OK;
-            
-            return res;
-        }
-        
-        return null;
+        return accumChild(child, t, DataType.TUPLE);
     }
-    
+
     /**
      * Drive all the UDFs in accumulative mode
      */
     protected Result accumChild(List<ExpressionOperator> child, DataBag db) throws ExecException {
-        if (isAccumStarted()) {
-            if (child == null) {
-                child = getChildExpressions();
-            }
-            Result res = null;
-            if (child != null) {        		
-                for(ExpressionOperator e: child) {        			
-                    if (e.containUDF()) {    
-                        res = e.getNext(db);
-                        if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
-                            return res;
-                        }
-                    }
-                }
-            }
-            
-            res = new Result();
-            res.returnStatus = POStatus.STATUS_BATCH_OK;
-            
-            return res;
-        }
-        
-        return null;
+        return accumChild(child, db, DataType.BAG);
     }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java Thu Dec 23 01:33:44 2010
@@ -25,7 +25,6 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -34,7 +33,7 @@ import org.apache.pig.impl.plan.VisitorE
 public class GTOrEqualToExpr extends BinaryComparisonOperator {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
     transient private final Log log = LogFactory.getLog(getClass());
@@ -60,70 +59,24 @@ public class GTOrEqualToExpr extends Bin
 
     @Override
     public Result getNext(Boolean bool) throws ExecException {
-        byte status;
         Result left, right;
 
         switch (operandType) {
-        case DataType.BYTEARRAY: {
-            Result r = accumChild(null, dummyDBA);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyDBA);
-            right = rhs.getNext(dummyDBA);
-            return doComparison(left, right);
-                            }
-
-        case DataType.DOUBLE: {
-            Result r = accumChild(null, dummyDouble);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyDouble);
-            right = rhs.getNext(dummyDouble);
-            return doComparison(left, right);
-                            }
-
-        case DataType.FLOAT: {
-            Result r = accumChild(null, dummyFloat);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyFloat);
-            right = rhs.getNext(dummyFloat);
-            return doComparison(left, right);
-                            }
-
-        case DataType.INTEGER: {
-            Result r = accumChild(null, dummyInt);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyInt);
-            right = rhs.getNext(dummyInt);
-            return doComparison(left, right);
-                            }
-
-        case DataType.LONG: {
-            Result r = accumChild(null, dummyLong);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyLong);
-            right = rhs.getNext(dummyLong);
-            return doComparison(left, right);
-                            }
-
+        case DataType.BYTEARRAY:
+        case DataType.DOUBLE:
+        case DataType.FLOAT:
+        case DataType.INTEGER:
+        case DataType.LONG:
         case DataType.CHARARRAY: {
-            Result r = accumChild(null, dummyString);
+            Object dummy = getDummy(operandType);
+            Result r = accumChild(null, dummy, operandType);
             if (r != null) {
                 return r;
             }
-            left = lhs.getNext(dummyString);
-            right = rhs.getNext(dummyString);
+            left = lhs.getNext(dummy, operandType);
+            right = rhs.getNext(dummy, operandType);
             return doComparison(left, right);
-                            }
-
+        }
 
         default: {
             int errCode = 2067;
@@ -131,15 +84,21 @@ public class GTOrEqualToExpr extends Bin
             "handle type: " + DataType.findTypeName(operandType);
             throw new ExecException(msg, errCode, PigException.BUG);
         }
-        
+
         }
     }
 
     @SuppressWarnings("unchecked")
     private Result doComparison(Result left, Result right) {
-        if (trueRef == null) initializeRefs();
-        if (left.returnStatus != POStatus.STATUS_OK) return left;
-        if (right.returnStatus != POStatus.STATUS_OK) return right;
+        if (trueRef == null) {
+            initializeRefs();
+        }
+        if (left.returnStatus != POStatus.STATUS_OK) {
+            return left;
+        }
+        if (right.returnStatus != POStatus.STATUS_OK) {
+            return right;
+        }
         // if either operand is null, the result should be
         // null
         if(left.result == null || right.result == null) {
@@ -149,7 +108,7 @@ public class GTOrEqualToExpr extends Bin
         }
         assert(left.result instanceof Comparable);
         assert(right.result instanceof Comparable);
-        if (((Comparable)left.result).compareTo((Comparable)right.result) >= 0) {
+        if (((Comparable)left.result).compareTo(right.result) >= 0) {
             left.result = trueRef;
         } else {
             left.result = falseRef;
@@ -160,7 +119,7 @@ public class GTOrEqualToExpr extends Bin
 
     @Override
     public GTOrEqualToExpr clone() throws CloneNotSupportedException {
-        GTOrEqualToExpr clone = new GTOrEqualToExpr(new OperatorKey(mKey.scope, 
+        GTOrEqualToExpr clone = new GTOrEqualToExpr(new OperatorKey(mKey.scope,
             NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)));
         clone.cloneHelper(this);
         return clone;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java Thu Dec 23 01:33:44 2010
@@ -30,7 +30,7 @@ import org.apache.pig.impl.plan.VisitorE
 public class GreaterThanExpr extends BinaryComparisonOperator {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
 
@@ -58,82 +58,42 @@ public class GreaterThanExpr extends Bin
         Result left, right;
 
         switch (operandType) {
-        case DataType.BYTEARRAY: {
-            Result r = accumChild(null, dummyDBA);
-            if (r != null) {
-                return r;
-            }        	
-            left = lhs.getNext(dummyDBA);
-            right = rhs.getNext(dummyDBA);
-            return doComparison(left, right);
-                            }
-
-        case DataType.DOUBLE: {
-            Result r = accumChild(null, dummyDouble);
-            if (r != null) {
-                return r;
-            }        	
-            left = lhs.getNext(dummyDouble);
-            right = rhs.getNext(dummyDouble);
-            return doComparison(left, right);
-                            }
-
-        case DataType.FLOAT: {
-            Result r = accumChild(null, dummyFloat);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyFloat);
-            right = rhs.getNext(dummyFloat);
-            return doComparison(left, right);
-                            }
-
-        case DataType.INTEGER: {
-            Result r = accumChild(null, dummyInt);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyInt);
-            right = rhs.getNext(dummyInt);
-            return doComparison(left, right);
-                            }
-
-        case DataType.LONG: {
-            Result r = accumChild(null, dummyLong);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyLong);
-            right = rhs.getNext(dummyLong);
-            return doComparison(left, right);
-                            }
-
+        case DataType.BYTEARRAY:
+        case DataType.DOUBLE:
+        case DataType.FLOAT:
+        case DataType.INTEGER:
+        case DataType.LONG:
         case DataType.CHARARRAY: {
-            Result r = accumChild(null, dummyString);
+            Object dummy = getDummy(operandType);
+            Result r = accumChild(null, dummy, operandType);
             if (r != null) {
                 return r;
             }
-            left = lhs.getNext(dummyString);
-            right = rhs.getNext(dummyString);
+            left = lhs.getNext(dummy, operandType);
+            right = rhs.getNext(dummy, operandType);
             return doComparison(left, right);
-                            }
-
-
+        }
         default: {
             int errCode = 2067;
             String msg = this.getClass().getSimpleName() + " does not know how to " +
             "handle type: " + DataType.findTypeName(operandType);
             throw new ExecException(msg, errCode, PigException.BUG);
         }
-        
+
         }
     }
 
     @SuppressWarnings("unchecked")
     private Result doComparison(Result left, Result right) {
-        if (trueRef == null) initializeRefs();
-        if (left.returnStatus != POStatus.STATUS_OK) return left;
-        if (right.returnStatus != POStatus.STATUS_OK) return right;
+        if (trueRef == null) {
+            initializeRefs();
+        }
+        if (left.returnStatus != POStatus.STATUS_OK) {
+            return left;
+        }
+        if (right.returnStatus != POStatus.STATUS_OK) {
+            return right;
+        }
         // if either operand is null, the result should be
         // null
         if(left.result == null || right.result == null) {
@@ -143,7 +103,7 @@ public class GreaterThanExpr extends Bin
         }
         assert(left.result instanceof Comparable);
         assert(right.result instanceof Comparable);
-        if (((Comparable)left.result).compareTo((Comparable)right.result) > 0) {
+        if (((Comparable)left.result).compareTo(right.result) > 0) {
             left.result = trueRef;
         } else {
             left.result = falseRef;
@@ -154,7 +114,7 @@ public class GreaterThanExpr extends Bin
 
     @Override
     public GreaterThanExpr clone() throws CloneNotSupportedException {
-        GreaterThanExpr clone = new GreaterThanExpr(new OperatorKey(mKey.scope, 
+        GreaterThanExpr clone = new GreaterThanExpr(new OperatorKey(mKey.scope,
             NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)));
         clone.cloneHelper(this);
         return clone;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java Thu Dec 23 01:33:44 2010
@@ -25,7 +25,6 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -34,7 +33,7 @@ import org.apache.pig.impl.plan.VisitorE
 public class LTOrEqualToExpr extends BinaryComparisonOperator {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
     transient private final Log log = LogFactory.getLog(getClass());
@@ -60,86 +59,45 @@ public class LTOrEqualToExpr extends Bin
 
     @Override
     public Result getNext(Boolean bool) throws ExecException {
-        byte status;
         Result left, right;
 
         switch (operandType) {
-        case DataType.BYTEARRAY: {
-            Result r = accumChild(null, dummyDBA);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyDBA);
-            right = rhs.getNext(dummyDBA);
-            return doComparison(left, right);
-                            }
-
-        case DataType.DOUBLE: {
-            Result r = accumChild(null, dummyDouble);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyDouble);
-            right = rhs.getNext(dummyDouble);
-            return doComparison(left, right);
-                            }
-
-        case DataType.FLOAT: {
-            Result r = accumChild(null, dummyFloat);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyFloat);
-            right = rhs.getNext(dummyFloat);
-            return doComparison(left, right);
-                            }
-
-        case DataType.INTEGER: {
-            Result r = accumChild(null, dummyInt);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyInt);
-            right = rhs.getNext(dummyInt);
-            return doComparison(left, right);
-                            }
-
-        case DataType.LONG: {
-            Result r = accumChild(null, dummyLong);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyLong);
-            right = rhs.getNext(dummyLong);
-            return doComparison(left, right);
-                            }
-
+        case DataType.BYTEARRAY:
+        case DataType.DOUBLE:
+        case DataType.FLOAT:
+        case DataType.INTEGER:
+        case DataType.LONG:
         case DataType.CHARARRAY: {
-            Result r = accumChild(null, dummyString);
+            Object dummy = getDummy(operandType);
+            Result r = accumChild(null, dummy, operandType);
             if (r != null) {
                 return r;
             }
-            left = lhs.getNext(dummyString);
-            right = rhs.getNext(dummyString);
+            left = lhs.getNext(dummy, operandType);
+            right = rhs.getNext(dummy, operandType);
             return doComparison(left, right);
-                            }
-
-
+        }
         default: {
             int errCode = 2067;
             String msg = this.getClass().getSimpleName() + " does not know how to " +
             "handle type: " + DataType.findTypeName(operandType);
             throw new ExecException(msg, errCode, PigException.BUG);
         }
-        
+
         }
     }
 
     @SuppressWarnings("unchecked")
     private Result doComparison(Result left, Result right) {
-        if (trueRef == null) initializeRefs();
-        if (left.returnStatus != POStatus.STATUS_OK) return left;
-        if (right.returnStatus != POStatus.STATUS_OK) return right;
+        if (trueRef == null) {
+            initializeRefs();
+        }
+        if (left.returnStatus != POStatus.STATUS_OK) {
+            return left;
+        }
+        if (right.returnStatus != POStatus.STATUS_OK) {
+            return right;
+        }
         // if either operand is null, the result should be
         // null
         if(left.result == null || right.result == null) {
@@ -149,7 +107,7 @@ public class LTOrEqualToExpr extends Bin
         }
         assert(left.result instanceof Comparable);
         assert(right.result instanceof Comparable);
-        if (((Comparable)left.result).compareTo((Comparable)right.result) <= 0) {
+        if (((Comparable)left.result).compareTo(right.result) <= 0) {
             left.result = trueRef;
         } else {
             left.result = falseRef;
@@ -160,7 +118,7 @@ public class LTOrEqualToExpr extends Bin
 
     @Override
     public LTOrEqualToExpr clone() throws CloneNotSupportedException {
-        LTOrEqualToExpr clone = new LTOrEqualToExpr(new OperatorKey(mKey.scope, 
+        LTOrEqualToExpr clone = new LTOrEqualToExpr(new OperatorKey(mKey.scope,
             NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)));
         clone.cloneHelper(this);
         return clone;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java Thu Dec 23 01:33:44 2010
@@ -24,7 +24,6 @@ import org.apache.pig.PigException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -34,7 +33,7 @@ import org.apache.pig.backend.executione
 public class LessThanExpr extends BinaryComparisonOperator {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
     transient private final Log log = LogFactory.getLog(getClass());
@@ -60,70 +59,24 @@ public class LessThanExpr extends Binary
 
     @Override
     public Result getNext(Boolean bool) throws ExecException {
-        byte status;
         Result left, right;
 
         switch (operandType) {
-        case DataType.BYTEARRAY: {
-            Result r = accumChild(null, dummyDBA);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyDBA);
-            right = rhs.getNext(dummyDBA);
-            return doComparison(left, right);
-                            }
-
-        case DataType.DOUBLE: {
-            Result r = accumChild(null, dummyDouble);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyDouble);
-            right = rhs.getNext(dummyDouble);
-            return doComparison(left, right);
-                            }
-
-        case DataType.FLOAT: {
-            Result r = accumChild(null, dummyFloat);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyFloat);
-            right = rhs.getNext(dummyFloat);
-            return doComparison(left, right);
-                            }
-
-        case DataType.INTEGER: {
-            Result r = accumChild(null, dummyInt);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyInt);
-            right = rhs.getNext(dummyInt);
-            return doComparison(left, right);
-                            }
-
-        case DataType.LONG: {
-            Result r = accumChild(null, dummyLong);
-            if (r != null) {
-                return r;
-            }
-            left = lhs.getNext(dummyLong);
-            right = rhs.getNext(dummyLong);
-            return doComparison(left, right);
-                            }
-
+        case DataType.BYTEARRAY:
+        case DataType.DOUBLE:
+        case DataType.FLOAT:
+        case DataType.INTEGER:
+        case DataType.LONG:
         case DataType.CHARARRAY: {
-            Result r = accumChild(null, dummyString);
+            Object dummy = getDummy(operandType);
+            Result r = accumChild(null, dummy, operandType);
             if (r != null) {
                 return r;
             }
-            left = lhs.getNext(dummyString);
-            right = rhs.getNext(dummyString);
+            left = lhs.getNext(dummy, operandType);
+            right = rhs.getNext(dummy, operandType);
             return doComparison(left, right);
-                            }
-
+        }
 
         default: {
             int errCode = 2067;
@@ -131,15 +84,21 @@ public class LessThanExpr extends Binary
             "handle type: " + DataType.findTypeName(operandType);
             throw new ExecException(msg, errCode, PigException.BUG);
         }
-        
+
         }
     }
 
     @SuppressWarnings("unchecked")
     private Result doComparison(Result left, Result right) {
-        if (trueRef == null) initializeRefs();
-        if (left.returnStatus != POStatus.STATUS_OK) return left;
-        if (right.returnStatus != POStatus.STATUS_OK) return right;
+        if (trueRef == null) {
+            initializeRefs();
+        }
+        if (left.returnStatus != POStatus.STATUS_OK) {
+            return left;
+        }
+        if (right.returnStatus != POStatus.STATUS_OK) {
+            return right;
+        }
         // if either operand is null, the result should be
         // null
         if(left.result == null || right.result == null) {
@@ -149,7 +108,7 @@ public class LessThanExpr extends Binary
         }
         assert(left.result instanceof Comparable);
         assert(right.result instanceof Comparable);
-        if (((Comparable)left.result).compareTo((Comparable)right.result) < 0) {
+        if (((Comparable)left.result).compareTo(right.result) < 0) {
             left.result = trueRef;
         } else {
             left.result = falseRef;
@@ -160,7 +119,7 @@ public class LessThanExpr extends Binary
 
     @Override
     public LessThanExpr clone() throws CloneNotSupportedException {
-        LessThanExpr clone = new LessThanExpr(new OperatorKey(mKey.scope, 
+        LessThanExpr clone = new LessThanExpr(new OperatorKey(mKey.scope,
             NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)));
         clone.cloneHelper(this);
         return clone;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Mod.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Mod.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Mod.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Mod.java Thu Dec 23 01:33:44 2010
@@ -29,7 +29,7 @@ import org.apache.pig.impl.plan.VisitorE
 public class Mod extends BinaryExpressionOperator {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
 
@@ -40,7 +40,7 @@ public class Mod extends BinaryExpressio
     public Mod(OperatorKey k, int rp) {
         super(k, rp);
     }
-    
+
     @Override
     public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitMod(this);
@@ -50,66 +50,60 @@ public class Mod extends BinaryExpressio
     public String name() {
         return "Mod" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
-    
-    @Override
-    public Result getNext(Integer i) throws ExecException{
-        Result r = accumChild(null, i);
+
+    @SuppressWarnings("unchecked")
+    protected <T extends Number> T mod(T a, T b, byte dataType) throws ExecException {
+        switch(dataType) {
+        case DataType.INTEGER:
+            return (T) Integer.valueOf((Integer) a % (Integer) b);
+        case DataType.LONG:
+            return (T) Long.valueOf((Long) a % (Long) b);
+        default:
+            throw new ExecException("called on unsupported Number class " + DataType.findTypeName(dataType));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <T extends Number> Result genericGetNext(T number, byte dataType) throws ExecException {
+        Result r = accumChild(null, number, dataType);
         if (r != null) {
             return r;
         }
-        
+
         byte status;
         Result res;
-        Integer left = null, right = null;
-        res = lhs.getNext(left);
+        T left = null, right = null;
+        res = lhs.getNext(left, dataType);
         status = res.returnStatus;
         if(status != POStatus.STATUS_OK || res.result == null) {
             return res;
         }
-        left = (Integer) res.result;
-        
-        res = rhs.getNext(right);
+        left = (T) res.result;
+
+        res = rhs.getNext(right, dataType);
         status = res.returnStatus;
         if(status != POStatus.STATUS_OK || res.result == null) {
             return res;
         }
-        right = (Integer) res.result;
-        
-        res.result = Integer.valueOf(left % right);
+        right = (T) res.result;
+
+        res.result = mod(left, right, dataType);
         return res;
     }
-    
+
+    @Override
+    public Result getNext(Integer i) throws ExecException{
+        return genericGetNext(i, DataType.INTEGER);
+    }
+
     @Override
     public Result getNext(Long i) throws ExecException{
-        Result r = accumChild(null, i);
-        if (r != null) {
-            return r;
-        }
-        
-        byte status;
-        Result res;
-        Long left = null, right = null;
-        res = lhs.getNext(left);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        left = (Long) res.result;
-        
-        res = rhs.getNext(right);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        right = (Long) res.result;
-        
-        res.result = Long.valueOf(left % right);
-        return res;
+        return genericGetNext(i, DataType.LONG);
     }
 
     @Override
     public Mod clone() throws CloneNotSupportedException {
-        Mod clone = new Mod(new OperatorKey(mKey.scope, 
+        Mod clone = new Mod(new OperatorKey(mKey.scope,
             NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)));
         clone.cloneHelper(this);
         return clone;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Multiply.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Multiply.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Multiply.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Multiply.java Thu Dec 23 01:33:44 2010
@@ -29,7 +29,7 @@ import org.apache.pig.impl.plan.VisitorE
 public class Multiply extends BinaryExpressionOperator {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
 
@@ -51,125 +51,77 @@ public class Multiply extends BinaryExpr
         return "Multiply" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
 
-    @Override
-    public Result getNext(Double d) throws ExecException {
-        Result r = accumChild(null, d);
+    @SuppressWarnings("unchecked")
+    protected <T extends Number> T multiply(T a, T b, byte dataType) throws ExecException {
+        switch(dataType) {
+        case DataType.DOUBLE:
+            return (T) Double.valueOf((Double) a * (Double) b);
+        case DataType.INTEGER:
+            return (T) Integer.valueOf((Integer) a * (Integer) b);
+        case DataType.LONG:
+            return (T) Long.valueOf((Long) a * (Long) b);
+        case DataType.FLOAT:
+            return (T) Float.valueOf((Float) a * (Float) b);
+        default:
+            throw new ExecException("called on unsupported Number class " + DataType.findTypeName(dataType));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <T extends Number> Result genericGetNext(T number, byte dataType) throws ExecException {
+        Result r = accumChild(null, number, dataType);
         if (r != null) {
             return r;
         }
-        
+
         byte status;
         Result res;
-        Double left = null, right = null;
-        res = lhs.getNext(left);
+        T left = null, right = null;
+        res = lhs.getNext(left, dataType);
         status = res.returnStatus;
         if(status != POStatus.STATUS_OK || res.result == null) {
             return res;
         }
-        left = (Double) res.result;
-        
-        res = rhs.getNext(right);
+        left = (T) res.result;
+
+        res = rhs.getNext(right, dataType);
         status = res.returnStatus;
         if(status != POStatus.STATUS_OK || res.result == null) {
             return res;
         }
-        right = (Double) res.result;
-        
-        res.result = new Double(left * right);
+        right = (T) res.result;
+
+        res.result = multiply(left, right, dataType);
         return res;
     }
-    
+
+    @Override
+    public Result getNext(Double d) throws ExecException {
+        return genericGetNext(d, DataType.DOUBLE);
+    }
+
     @Override
     public Result getNext(Float f) throws ExecException {
-        Result r = accumChild(null, f);
-        if (r != null) {
-            return r;
-        }
-        
-        byte status;
-        Result res;
-        Float left = null, right = null;
-        res = lhs.getNext(left);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        left = (Float) res.result;
-        
-        res = rhs.getNext(right);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        right = (Float) res.result;
-        
-        res.result = new Float(left * right);
-        return res;
+        return genericGetNext(f, DataType.FLOAT);
     }
-    
+
     @Override
     public Result getNext(Integer i) throws ExecException {
-        Result r = accumChild(null, i);
-        if (r != null) {
-            return r;
-        }
-        
-        byte status;
-        Result res;
-        Integer left = null, right = null;
-        res = lhs.getNext(left);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        left = (Integer) res.result;
-        
-        res = rhs.getNext(right);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        right = (Integer) res.result;
-        
-        res.result = Integer.valueOf(left * right);
-        return res;
+        return genericGetNext(i, DataType.INTEGER);
     }
-    
+
     @Override
     public Result getNext(Long l) throws ExecException {
-        Result r = accumChild(null, l);
-        if (r != null) {
-            return r;
-        }
-        
-        byte status;
-        Result res;
-        Long left = null, right = null;
-        res = lhs.getNext(left);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        left = (Long) res.result;
-        
-        res = rhs.getNext(right);
-        status = res.returnStatus;
-        if(status != POStatus.STATUS_OK || res.result == null) {
-            return res;
-        }
-        right = (Long) res.result;
-        
-        res.result = Long.valueOf(left * right);
-        return res;
+        return genericGetNext(l, DataType.LONG);
     }
 
     @Override
     public Multiply clone() throws CloneNotSupportedException {
-        Multiply clone = new Multiply(new OperatorKey(mKey.scope, 
+        Multiply clone = new Multiply(new OperatorKey(mKey.scope,
             NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)));
         clone.cloneHelper(this);
         return clone;
     }
-    
-    
+
+
 }