You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2009/11/12 19:33:18 UTC

svn commit: r835487 [1/3] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengin...

Author: daijy
Date: Thu Nov 12 18:33:15 2009
New Revision: 835487

URL: http://svn.apache.org/viewvc?rev=835487&view=rev
Log:
PIG-979: Acummulator Interface for UDFs

Added:
    hadoop/pig/trunk/src/org/apache/pig/Accumulator.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/AccumulativeTupleBuffer.java
    hadoop/pig/trunk/src/org/apache/pig/data/AccumulativeBag.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java
    hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulativeSumBag.java
    hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulatorBagCount.java
    hadoop/pig/trunk/test/org/apache/pig/test/utils/BagCount.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Add.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Mod.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Multiply.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryExpressionOperator.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java
    hadoop/pig/trunk/test/org/apache/pig/test/utils/POCastDummy.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Nov 12 18:33:15 2009
@@ -113,6 +113,8 @@
 
 PIG-1038: Optimize nested distinct/sort to use secondary key (daijy)
 
+PIG-979: Acummulator Interface for UDFs (yinghe via daijy)
+
 OPTIMIZATIONS
 
 BUG FIXES

Added: hadoop/pig/trunk/src/org/apache/pig/Accumulator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Accumulator.java?rev=835487&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/Accumulator.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/Accumulator.java Thu Nov 12 18:33:15 2009
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.pig.data.Tuple;
+
+public interface Accumulator <T> {
+    /**
+     * Pass tuples to the UDF.  You can retrive DataBag by calling b.get(index). 
+     * Each DataBag may contain 0 to many tuples for current key
+     */
+    public void accumulate(Tuple b) throws IOException;
+
+    /**
+     * Called when all tuples from current key have been passed to accumulate.
+     * @return the value for the UDF for this key.
+     */
+    public T getValue();
+    
+    /** 
+     * Called after getValue() to prepare processing for next key. 
+     */
+    public void cleanup();
+}

Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java?rev=835487&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java Thu Nov 12 18:33:15 2009
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.Accumulator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POMapLookUp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor to optimize plans that determines if a reduce plan
+ * can run in accumulative mode. 
+ */
+public class AccumulatorOptimizer extends MROpPlanVisitor {
+
+    private Log log = LogFactory.getLog(getClass());
+
+    public AccumulatorOptimizer(MROperPlan plan) {
+        super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+    }
+
+    public void visitMROp(MapReduceOper mr) throws VisitorException {
+        // See if this is a map-reduce job
+        List<PhysicalOperator> pos = mr.reducePlan.getRoots();
+        if (pos == null || pos.size() == 0) {        
+            return;
+        }
+        
+       // See if this is a POPackage
+        PhysicalOperator po_package = pos.get(0);
+        if (!po_package.getClass().equals(POPackage.class)) {            
+            return; 
+        }
+        
+        // if POPackage is for distinct, just return
+        if (((POPackage)po_package).isDistinct()) {
+            return;
+        }
+        
+        // if any input to POPackage is inner, just return
+        boolean[] isInner = ((POPackage)po_package).getInner();
+        for(boolean b: isInner) {
+            if (b) {
+                return;
+            }
+        }
+        
+        List<PhysicalOperator> l = mr.reducePlan.getSuccessors(po_package);
+        // there should be only one POForEach
+        if (l == null || l.size() == 0 || l.size() > 1) {        	
+            return;
+        }
+        
+        PhysicalOperator po_foreach = l.get(0);
+        if (!(po_foreach instanceof POForEach)) {            
+            return; 
+        }
+        
+        boolean foundUDF = false;
+        List<PhysicalPlan> list = ((POForEach)po_foreach).getInputPlans();
+        for(PhysicalPlan p: list) {
+            PhysicalOperator po = p.getLeaves().get(0);
+            
+            // only expression operators are allowed
+            if (!(po instanceof ExpressionOperator)) {
+                return;
+            }
+            
+            if (((ExpressionOperator)po).containUDF()) {
+                foundUDF = true;
+            }
+            
+            if (!check(po)) {
+                return;
+            }
+        }
+        
+        if (foundUDF) {
+            // if all tests are passed, reducer can run in accumulative mode
+            log.info("Reducer is to run in accumulative mode.");
+            po_package.setAccumulative();
+            po_foreach.setAccumulative();
+        }
+    }
+     
+    /**
+     * Check if an operator is qualified to be under POForEach
+     * to turn on accumulator. The operator must be in the following list or
+     * an <code>POUserFunc</code>.
+     *
+     * If the operator has sub-operators, they must also belong to this list.
+     * <li>ConstantExpression</li>
+     * <li>POProject, whose result type is not BAG, or TUPLE and overloaded</li>
+     * <li>POMapLookup</li>
+     * <li>POCase</li>
+     * <li>UnaryExpressionOperator</li>
+     * <li>BinaryExpressionOperator</li>
+     * <li>POBinCond</li>
+     *
+     * If the operator is <code>POUserFunc</code>, it must implement
+     * <code>Accumulator</code> interface and its inputs pass the check
+     * by calling <code>checkUDFInput()</code>
+     *
+     * @param po the operator to be checked on
+     * @return <code>true</code> if it is ok, <code>false</code>
+     *    if not.
+     */
+    @SuppressWarnings("unchecked")
+    private boolean check(PhysicalOperator po) {
+        if (po instanceof ConstantExpression) {
+            return true;
+        }
+        
+        if (po instanceof POCast) {
+            return check(po.getInputs().get(0));
+        }
+        
+        if (po instanceof POMapLookUp) {
+            return true;
+        }
+        
+        if (po instanceof POProject) {
+            // POProject can not project data bag
+            if (((POProject)po).getResultType() == DataType.BAG) {
+                return false;
+            }
+            
+            // POProject can not overload a data bag
+            if (((POProject)po).getResultType() == DataType.TUPLE && ((POProject)po).isOverloaded()) {
+                return false;
+            }
+            
+            return true;
+        }    	
+         
+        if (po instanceof UnaryExpressionOperator) {
+            return check(((UnaryExpressionOperator)po).getExpr());
+        }
+        
+        if (po instanceof BinaryExpressionOperator) {
+            return check(((BinaryExpressionOperator)po).getLhs()) &&
+                    check(((BinaryExpressionOperator)po).getRhs());
+        }
+        
+        if (po instanceof POBinCond) {
+            return check(((POBinCond)po).getLhs()) &&
+                check(((POBinCond)po).getRhs()) && check(((POBinCond)po).getCond());
+        }
+                
+        if (po instanceof POUserFunc) {
+            String className = ((POUserFunc)po).getFuncSpec().getClassName();
+            Class c = null;
+            try {
+                c = PigContext.resolveClassName(className);
+            }catch(Exception e) {
+                return false;
+            }
+            if (!Accumulator.class.isAssignableFrom(c)) {
+                return false;
+            }    		    	
+            
+            // check input of UDF
+             List<PhysicalOperator> inputs = po.getInputs();
+             for(PhysicalOperator p: inputs) {
+                 if (!checkUDFInput(p)) {
+                     return false;
+                 }
+             }
+             
+             return true;
+        }
+        
+        return false;
+    }
+    
+    /**
+     * Check operators under POUserFunc to verify if this
+      * is a valid UDF to run as accumulator. The inputs to
+     * <code>POUserFunc</code> must be in the following list.
+     * If the operator has sub-operators, they must also belong
+     * to this list.
+     *
+     * <li>PORelationToExprProject</li>
+     * <li>ConstantExpression</li>
+     * <li>POProject</li>
+     * <li>POCase</li>
+      * <li>UnaryExpressionOperator</li>
+     * <li>BinaryExpressionOperator</li>
+     * <li>POBinCond</li>
+     * <li>POSortedDistinct</li>
+     * <li>POForEach</li>
+     *
+     */
+    private boolean checkUDFInput(PhysicalOperator po) {    	
+        if (po instanceof PORelationToExprProject) {
+            return checkUDFInput(po.getInputs().get(0));
+        }
+
+        if (po instanceof POProject) {
+            return true;
+        }
+        
+        if (po instanceof ConstantExpression) {
+            return true;
+        }
+        
+        if (po instanceof UnaryExpressionOperator) {
+            return checkUDFInput(((UnaryExpressionOperator)po).getExpr());
+        }
+        
+        if (po instanceof BinaryExpressionOperator) {
+            return checkUDFInput(((BinaryExpressionOperator)po).getLhs()) ||
+            checkUDFInput(((BinaryExpressionOperator)po).getRhs());
+        }
+        
+        if (po instanceof POCast) {
+            return checkUDFInput(po.getInputs().get(0));
+        }
+        
+        if (po instanceof POBinCond) {
+            return checkUDFInput(((POBinCond)po).getLhs()) ||
+            checkUDFInput(((POBinCond)po).getRhs()) || checkUDFInput(((POBinCond)po).getCond());
+        }
+        
+        if (po instanceof POSortedDistinct) {    		    		
+            return true;    	
+        }
+        
+        if (po instanceof POForEach) {
+            List<PhysicalPlan> list = ((POForEach)po).getInputPlans();
+            if (list.size() != 1) {
+                return false;
+            }
+            
+            PhysicalOperator p = list.get(0).getLeaves().get(0);
+            if (checkUDFInput(p)) {
+                return checkUDFInput(po.getInputs().get(0));
+            }
+        }  
+        
+        return false;
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Nov 12 18:33:15 2009
@@ -181,18 +181,18 @@
             //if the job controller fails before launching the jobs then there are
             //no jobs to check for failure
             if(jobControlException != null) {
-            	if(jobControlException instanceof PigException) {
-            	        if(jobControlExceptionStackTrace != null) {
-            	            LogUtils.writeLog("Error message from job controller", jobControlExceptionStackTrace, 
-            	                    pc.getProperties().getProperty("pig.logfile"), 
+                if(jobControlException instanceof PigException) {
+                        if(jobControlExceptionStackTrace != null) {
+                            LogUtils.writeLog("Error message from job controller", jobControlExceptionStackTrace, 
+                                    pc.getProperties().getProperty("pig.logfile"), 
                                     log);
-            	        }
+                        }
                         throw jobControlException;
-            	} else {
+                } else {
                         int errCode = 2117;
                         String msg = "Unexpected error when launching map reduce job.";        	
                         throw new ExecException(msg, errCode, PigException.BUG, jobControlException);
-            	}
+                }
             }
 
             if (!jc.getFailedJobs().isEmpty() )
@@ -424,6 +424,9 @@
         EndOfAllInputSetter checker = new EndOfAllInputSetter(plan);
         checker.visit();
         
+        AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
+        accum.visit();
+        
         return plan;
     }
     
@@ -434,29 +437,29 @@
      * explicitly or if the default handler is null
      */
     class JobControlThreadExceptionHandler implements Thread.UncaughtExceptionHandler {
-    	
-    	public void uncaughtException(Thread thread, Throwable throwable) {
-    		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    		PrintStream ps = new PrintStream(baos);
-    		throwable.printStackTrace(ps);
-    		jobControlExceptionStackTrace = baos.toString();    		
-    		try {	
-    			jobControlException = getExceptionFromString(jobControlExceptionStackTrace);
-    		} catch (Exception e) {
-    			String errMsg = "Could not resolve error that occured when launching map reduce job: "
+        
+        public void uncaughtException(Thread thread, Throwable throwable) {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            PrintStream ps = new PrintStream(baos);
+            throwable.printStackTrace(ps);
+            jobControlExceptionStackTrace = baos.toString();    		
+            try {	
+                jobControlException = getExceptionFromString(jobControlExceptionStackTrace);
+            } catch (Exception e) {
+                String errMsg = "Could not resolve error that occured when launching map reduce job: "
                         + getFirstLineFromMessage(jobControlExceptionStackTrace);
-    			jobControlException = new RuntimeException(errMsg);
-    		}
-    	}
+                jobControlException = new RuntimeException(errMsg);
+            }
+        }
     }
     
     void computeWarningAggregate(Job job, JobClient jobClient, Map<Enum, Long> aggMap) {
-    	JobID mapRedJobID = job.getAssignedJobID();
-    	RunningJob runningJob = null;
-    	try {
-    		runningJob = jobClient.getJob(mapRedJobID);
-    		if(runningJob != null) {
-        		Counters counters = runningJob.getCounters();
+        JobID mapRedJobID = job.getAssignedJobID();
+        RunningJob runningJob = null;
+        try {
+            runningJob = jobClient.getJob(mapRedJobID);
+            if(runningJob != null) {
+                Counters counters = runningJob.getCounters();
                 if (counters==null)
                 {
                     long nullCounterCount = aggMap.get(PigWarning.NULL_COUNTER_COUNT)==null?0 : aggMap.get(PigWarning.NULL_COUNTER_COUNT);
@@ -479,11 +482,11 @@
                         aggMap.put(e, currentCount);
                     }
                 }
-    		}
-    	} catch (IOException ioe) {
-    		String msg = "Unable to retrieve job to compute warning aggregation.";
-    		log.warn(msg);
-    	}    	
+            }
+        } catch (IOException ioe) {
+            String msg = "Unable to retrieve job to compute warning aggregation.";
+            log.warn(msg);
+        }    	
     }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java Thu Nov 12 18:33:15 2009
@@ -30,5 +30,9 @@
     // between ExecutableManager and POStream
     public static final byte STATUS_EOS = 4; // end of Streaming output (i.e. output from streaming binary)
 
+    // successfully processing of a batch, used by accumulative UDFs
+    // this is used for accumulative UDFs
+    public static final byte STATUS_BATCH_OK = 5; 
+
     public static Object result;
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Add.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Add.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Add.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Add.java Thu Nov 12 18:33:15 2009
@@ -53,6 +53,11 @@
 
     @Override
     public Result getNext(Double d) throws ExecException {
+        Result r = accumChild(null, d);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Double left = null, right = null;
@@ -76,6 +81,11 @@
     
     @Override
     public Result getNext(Float f) throws ExecException {
+        Result r = accumChild(null, f);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Float left = null, right = null;
@@ -98,7 +108,12 @@
     }
     
     @Override
-    public Result getNext(Integer i) throws ExecException {
+    public Result getNext(Integer i) throws ExecException {    	
+        Result r = accumChild(null, i);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Integer left = null, right = null;
@@ -122,6 +137,11 @@
     
     @Override
     public Result getNext(Long l) throws ExecException {
+        Result r = accumChild(null, l);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Long left = null, right = null;

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java Thu Nov 12 18:33:15 2009
@@ -17,6 +17,9 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.pig.impl.plan.OperatorKey;
 
 /**
@@ -31,6 +34,7 @@
     
     protected ExpressionOperator lhs;
     protected ExpressionOperator rhs;
+    private transient List<ExpressionOperator> child;
     
     public BinaryExpressionOperator(OperatorKey k) {
         this(k,-1);
@@ -44,6 +48,18 @@
         return lhs;
     }
     
+    /**
+     * Get the child expressions of this expression
+     */
+    public List<ExpressionOperator> getChildExpressions() {
+        if (child == null) {
+            child = new ArrayList<ExpressionOperator>();    	
+            child.add(lhs);    	
+            child.add(rhs);
+        }
+        return child;
+    }
+    
     @Override
     public boolean supportsMultipleInputs() {
         return true;

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java Thu Nov 12 18:33:15 2009
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
 
+import java.util.List;
 import java.util.Map;
 
 import org.apache.pig.backend.executionengine.ExecException;
@@ -195,4 +196,12 @@
         clone.cloneHelper(this);
         return clone;
     }
+    
+    /**
+     * Get the child expressions of this expression
+     */
+    @Override
+    public List<ExpressionOperator> getChildExpressions() {		
+        return null;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java Thu Nov 12 18:33:15 2009
@@ -54,6 +54,11 @@
 
     @Override
     public Result getNext(Double d) throws ExecException {
+        Result r = accumChild(null, d);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Double left = null, right = null;
@@ -72,9 +77,9 @@
         right = (Double) res.result;
         
         if (right == 0) {
-        	if(pigLogger != null) {
-        		pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
-        	}
+            if(pigLogger != null) {
+                pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
+            }
             res.result = null;
         }
         else
@@ -84,6 +89,11 @@
     
     @Override
     public Result getNext(Float f) throws ExecException {
+        Result r = accumChild(null, f);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Float left = null, right = null;
@@ -102,9 +112,9 @@
         right = (Float) res.result;
         
         if (right == 0) {
-        	if(pigLogger != null) {
-        		pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
-        	}
+            if(pigLogger != null) {
+                pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
+            }
             res.result = null;
         }
         else
@@ -114,6 +124,11 @@
     
     @Override
     public Result getNext(Integer i) throws ExecException {
+        Result r = accumChild(null, i);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Integer left = null, right = null;
@@ -132,10 +147,10 @@
         right = (Integer) res.result;
         
         if (right == 0) {
-        	if(pigLogger != null) {
-        		pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
-        	}
-        	res.result = null;
+            if(pigLogger != null) {
+                pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
+            }
+            res.result = null;
         }
         else
             res.result = Integer.valueOf(left / right);
@@ -144,6 +159,11 @@
     
     @Override
     public Result getNext(Long l) throws ExecException {
+        Result r = accumChild(null, l);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Long left = null, right = null;
@@ -162,9 +182,9 @@
         right = (Long) res.result;
         
         if (right == 0) {
-        	if(pigLogger != null) {
-        		pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
-        	}
+            if(pigLogger != null) {
+                pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
+            }
             res.result = null;
         }
         else

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java Thu Nov 12 18:33:15 2009
@@ -65,36 +65,60 @@
 
         switch (operandType) {
         case DataType.BYTEARRAY: {
+            Result r = accumChild(null, dummyDBA);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyDBA);
             right = rhs.getNext(dummyDBA);
             return doComparison(left, right);
                             }
 
         case DataType.DOUBLE: {
+            Result r = accumChild(null, dummyDouble);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyDouble);
             right = rhs.getNext(dummyDouble);
             return doComparison(left, right);
                             }
 
         case DataType.FLOAT: {
+            Result r = accumChild(null, dummyFloat);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyFloat);
             right = rhs.getNext(dummyFloat);
             return doComparison(left, right);
                             }
 
         case DataType.INTEGER: {
+            Result r = accumChild(null, dummyInt);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyInt);
             right = rhs.getNext(dummyInt);
             return doComparison(left, right);
                             }
 
         case DataType.LONG: {
+            Result r = accumChild(null, dummyLong);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyLong);
             right = rhs.getNext(dummyLong);
             return doComparison(left, right);
                             }
 
         case DataType.CHARARRAY: {
+            Result r = accumChild(null, dummyString);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyString);
             right = rhs.getNext(dummyString);
             return doComparison(left, right);

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java Thu Nov 12 18:33:15 2009
@@ -18,15 +18,20 @@
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
 
 
+import java.util.List;
+import java.util.Map;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.impl.plan.VisitorException;
@@ -60,6 +65,7 @@
     }
     
     public abstract void visit(PhyPlanVisitor v) throws VisitorException;
+    
 
     /**
      * Make a deep copy of this operator.  This is declared here to make it
@@ -73,5 +79,319 @@
         throw new CloneNotSupportedException(s);
     }
 
+    /**
+     * Get the sub-expressions of this expression.
+     * This is called if reducer is run as accumulative mode, all the child
+     * expression must be called if they have any UDF to drive the UDF.accumulate()
+     */
+    protected abstract List<ExpressionOperator> getChildExpressions();
+    
+    /** check whether this expression contains any UDF
+     * this is called if reducer is run as accumulative mode
+     * in this case, all UDFs must be called 
+     */
+    public boolean containUDF() {
+        if (this instanceof POUserFunc) {
+            return true;
+        }
+        
+        List<ExpressionOperator> l = getChildExpressions();
+        if (l != null) {
+            for(ExpressionOperator e: l) {
+                return e.containUDF();    				
+            }
+        }
+        
+        return false;
+    }
 
+    /**
+     * Drive all the UDFs in accumulative mode
+     */
+    protected Result accumChild(List<ExpressionOperator> child, Double d) throws ExecException {
+        if (isAccumStarted()) {
+            if (child == null) {
+                child = getChildExpressions();
+            }
+            Result res = null;
+            if (child != null) {        		
+                for(ExpressionOperator e: child) {        			
+                    if (e.containUDF()) {    
+                        res = e.getNext(d);
+                        if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+                            return res;
+                        }
+                    }
+                }
+            }
+            
+            res = new Result();
+            res.returnStatus = POStatus.STATUS_BATCH_OK;
+            
+            return res;
+        }
+        
+        return null;
+    }
+       
+    /**
+     * Drive all the UDFs in accumulative mode
+     */
+    protected Result accumChild(List<ExpressionOperator> child, Integer v) throws ExecException {    	
+        if (isAccumStarted()) {    		
+            if (child == null) {
+                child = getChildExpressions();
+            }
+            Result res = null;
+            if (child != null) {        		
+                for(ExpressionOperator e: child) {        			
+                    if (e.containUDF()) {            				
+                        res = e.getNext(v);
+                        if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+                            return res;
+                        }
+                    }
+                }
+            }
+            
+            res = new Result();
+            res.returnStatus = POStatus.STATUS_BATCH_OK;
+            
+            return res;
+        }
+        
+        return null;
+    }
+       
+    /**
+     * Drive all the UDFs in accumulative mode
+     */
+    protected Result accumChild(List<ExpressionOperator> child, Long l) throws ExecException {
+        if (isAccumStarted()) {
+            if (child == null) {
+                child = getChildExpressions();
+            }
+            Result res = null;
+            if (child != null) {        		
+                for(ExpressionOperator e: child) {        			
+                    if (e.containUDF()) {    
+                        res = e.getNext(l);
+                        if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+                            return res;
+                        }
+                    }
+                }
+            }
+            
+            res = new Result();
+            res.returnStatus = POStatus.STATUS_BATCH_OK;
+            
+            return res;
+        }
+        
+        return null;
+    }
+    
+    /**
+     * Drive all the UDFs in accumulative mode
+     */
+    protected Result accumChild(List<ExpressionOperator> child, Float f) throws ExecException {
+        if (isAccumStarted()) {
+            if (child == null) {
+                child = getChildExpressions();
+            }
+            Result res = null;
+            if (child != null) {        		
+                for(ExpressionOperator e: child) {        			
+                    if (e.containUDF()) {    
+                        res = e.getNext(f);
+                        if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+                            return res;
+                        }
+                    }
+                }
+            }
+            
+            res = new Result();
+            res.returnStatus = POStatus.STATUS_BATCH_OK;
+            
+            return res;
+        }
+        
+        return null;
+    }
+       
+    /**
+     * Drive all the UDFs in accumulative mode
+     */
+    protected Result accumChild(List<ExpressionOperator> child, Boolean b) throws ExecException {
+        if (isAccumStarted()) {
+            if (child == null) {
+                child = getChildExpressions();
+            }
+            Result res = null;
+            if (child != null) {        		
+                for(ExpressionOperator e: child) {           			
+                    if (e.containUDF()) {    
+                        res = e.getNext(b);
+                        if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+                            return res;
+                        }
+                    }
+                }
+            }
+            
+            res = new Result();
+            res.returnStatus = POStatus.STATUS_BATCH_OK;
+            
+            return res;
+        }
+        
+        return null;
+    }
+    
+    /**
+     * Drive all the UDFs in accumulative mode
+     */
+    protected Result accumChild(List<ExpressionOperator> child, String s) throws ExecException {
+        if (isAccumStarted()) {
+            if (child == null) {
+                child = getChildExpressions();
+            }
+            Result res = null;
+            if (child != null) {        		
+                for(ExpressionOperator e: child) {        			
+                    if (e.containUDF()) {    
+                        res = e.getNext(s);
+                        if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+                            return res;
+                        }
+                    }
+                }
+            }
+            
+            res = new Result();
+            res.returnStatus = POStatus.STATUS_BATCH_OK;
+            
+            return res;
+        }
+        
+        return null;
+    }
+       
+    /**
+     * Drive all the UDFs in accumulative mode
+     */
+    protected Result accumChild(List<ExpressionOperator> child, DataByteArray dba) throws ExecException {
+        if (isAccumStarted()) {
+            if (child == null) {
+                child = getChildExpressions();
+            }
+            Result res = null;
+            if (child != null) {        		
+                for(ExpressionOperator e: child) {        			
+                    if (e.containUDF()) {    
+                        res = e.getNext(dba);
+                        if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+                            return res;
+                        }
+                    }
+                }
+            }
+            
+            res = new Result();
+            res.returnStatus = POStatus.STATUS_BATCH_OK;
+            
+            return res;
+        }
+        
+        return null;
+    }
+    
+    /**
+     * Drive all the UDFs in accumulative mode
+     */
+    protected Result accumChild(List<ExpressionOperator> child, Map map) throws ExecException {
+        if (isAccumStarted()) {
+            if (child == null) {
+                child = getChildExpressions();
+            }
+            Result res = null;
+            if (child != null) {        		
+                for(ExpressionOperator e: child) {        			
+                    if (e.containUDF()) {    
+                        res = e.getNext(map);
+                        if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+                            return res;
+                        }
+                    }
+                }
+            }
+            
+            res = new Result();
+            res.returnStatus = POStatus.STATUS_BATCH_OK;
+            
+            return res;
+        }
+        
+        return null;
+    }
+    
+    /**
+     * Drive all the UDFs in accumulative mode
+     */
+    protected Result accumChild(List<ExpressionOperator> child, Tuple t) throws ExecException {
+        if (isAccumStarted()) {
+            if (child == null) {
+                child = getChildExpressions();
+            }
+            Result res = null;
+            if (child != null) {        		
+                for(ExpressionOperator e: child) {        			
+                    if (e.containUDF()) {    
+                        res = e.getNext(t);
+                        if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+                            return res;
+                        }
+                    }
+                }
+            }
+            
+            res = new Result();
+            res.returnStatus = POStatus.STATUS_BATCH_OK;
+            
+            return res;
+        }
+        
+        return null;
+    }
+    
+    /**
+     * Drive all the UDFs in accumulative mode
+     */
+    protected Result accumChild(List<ExpressionOperator> child, DataBag db) throws ExecException {
+        if (isAccumStarted()) {
+            if (child == null) {
+                child = getChildExpressions();
+            }
+            Result res = null;
+            if (child != null) {        		
+                for(ExpressionOperator e: child) {        			
+                    if (e.containUDF()) {    
+                        res = e.getNext(db);
+                        if (res.returnStatus != POStatus.STATUS_BATCH_OK) {
+                            return res;
+                        }
+                    }
+                }
+            }
+            
+            res = new Result();
+            res.returnStatus = POStatus.STATUS_BATCH_OK;
+            
+            return res;
+        }
+        
+        return null;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java Thu Nov 12 18:33:15 2009
@@ -65,36 +65,60 @@
 
         switch (operandType) {
         case DataType.BYTEARRAY: {
+            Result r = accumChild(null, dummyDBA);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyDBA);
             right = rhs.getNext(dummyDBA);
             return doComparison(left, right);
                             }
 
         case DataType.DOUBLE: {
+            Result r = accumChild(null, dummyDouble);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyDouble);
             right = rhs.getNext(dummyDouble);
             return doComparison(left, right);
                             }
 
         case DataType.FLOAT: {
+            Result r = accumChild(null, dummyFloat);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyFloat);
             right = rhs.getNext(dummyFloat);
             return doComparison(left, right);
                             }
 
         case DataType.INTEGER: {
+            Result r = accumChild(null, dummyInt);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyInt);
             right = rhs.getNext(dummyInt);
             return doComparison(left, right);
                             }
 
         case DataType.LONG: {
+            Result r = accumChild(null, dummyLong);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyLong);
             right = rhs.getNext(dummyLong);
             return doComparison(left, right);
                             }
 
         case DataType.CHARARRAY: {
+            Result r = accumChild(null, dummyString);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyString);
             right = rhs.getNext(dummyString);
             return doComparison(left, right);

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java Thu Nov 12 18:33:15 2009
@@ -65,36 +65,60 @@
 
         switch (operandType) {
         case DataType.BYTEARRAY: {
+            Result r = accumChild(null, dummyDBA);
+            if (r != null) {
+                return r;
+            }        	
             left = lhs.getNext(dummyDBA);
             right = rhs.getNext(dummyDBA);
             return doComparison(left, right);
                             }
 
         case DataType.DOUBLE: {
+            Result r = accumChild(null, dummyDouble);
+            if (r != null) {
+                return r;
+            }        	
             left = lhs.getNext(dummyDouble);
             right = rhs.getNext(dummyDouble);
             return doComparison(left, right);
                             }
 
         case DataType.FLOAT: {
+            Result r = accumChild(null, dummyFloat);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyFloat);
             right = rhs.getNext(dummyFloat);
             return doComparison(left, right);
                             }
 
         case DataType.INTEGER: {
+            Result r = accumChild(null, dummyInt);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyInt);
             right = rhs.getNext(dummyInt);
             return doComparison(left, right);
                             }
 
         case DataType.LONG: {
+            Result r = accumChild(null, dummyLong);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyLong);
             right = rhs.getNext(dummyLong);
             return doComparison(left, right);
                             }
 
         case DataType.CHARARRAY: {
+            Result r = accumChild(null, dummyString);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyString);
             right = rhs.getNext(dummyString);
             return doComparison(left, right);

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java Thu Nov 12 18:33:15 2009
@@ -65,36 +65,60 @@
 
         switch (operandType) {
         case DataType.BYTEARRAY: {
+            Result r = accumChild(null, dummyDBA);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyDBA);
             right = rhs.getNext(dummyDBA);
             return doComparison(left, right);
                             }
 
         case DataType.DOUBLE: {
+            Result r = accumChild(null, dummyDouble);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyDouble);
             right = rhs.getNext(dummyDouble);
             return doComparison(left, right);
                             }
 
         case DataType.FLOAT: {
+            Result r = accumChild(null, dummyFloat);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyFloat);
             right = rhs.getNext(dummyFloat);
             return doComparison(left, right);
                             }
 
         case DataType.INTEGER: {
+            Result r = accumChild(null, dummyInt);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyInt);
             right = rhs.getNext(dummyInt);
             return doComparison(left, right);
                             }
 
         case DataType.LONG: {
+            Result r = accumChild(null, dummyLong);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyLong);
             right = rhs.getNext(dummyLong);
             return doComparison(left, right);
                             }
 
         case DataType.CHARARRAY: {
+            Result r = accumChild(null, dummyString);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyString);
             right = rhs.getNext(dummyString);
             return doComparison(left, right);

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java Thu Nov 12 18:33:15 2009
@@ -65,36 +65,60 @@
 
         switch (operandType) {
         case DataType.BYTEARRAY: {
+            Result r = accumChild(null, dummyDBA);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyDBA);
             right = rhs.getNext(dummyDBA);
             return doComparison(left, right);
                             }
 
         case DataType.DOUBLE: {
+            Result r = accumChild(null, dummyDouble);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyDouble);
             right = rhs.getNext(dummyDouble);
             return doComparison(left, right);
                             }
 
         case DataType.FLOAT: {
+            Result r = accumChild(null, dummyFloat);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyFloat);
             right = rhs.getNext(dummyFloat);
             return doComparison(left, right);
                             }
 
         case DataType.INTEGER: {
+            Result r = accumChild(null, dummyInt);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyInt);
             right = rhs.getNext(dummyInt);
             return doComparison(left, right);
                             }
 
         case DataType.LONG: {
+            Result r = accumChild(null, dummyLong);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyLong);
             right = rhs.getNext(dummyLong);
             return doComparison(left, right);
                             }
 
         case DataType.CHARARRAY: {
+            Result r = accumChild(null, dummyString);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyString);
             right = rhs.getNext(dummyString);
             return doComparison(left, right);

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Mod.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Mod.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Mod.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Mod.java Thu Nov 12 18:33:15 2009
@@ -53,6 +53,11 @@
     
     @Override
     public Result getNext(Integer i) throws ExecException{
+        Result r = accumChild(null, i);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Integer left = null, right = null;
@@ -76,6 +81,11 @@
     
     @Override
     public Result getNext(Long i) throws ExecException{
+        Result r = accumChild(null, i);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Long left = null, right = null;

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Multiply.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Multiply.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Multiply.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Multiply.java Thu Nov 12 18:33:15 2009
@@ -53,6 +53,11 @@
 
     @Override
     public Result getNext(Double d) throws ExecException {
+        Result r = accumChild(null, d);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Double left = null, right = null;
@@ -76,6 +81,11 @@
     
     @Override
     public Result getNext(Float f) throws ExecException {
+        Result r = accumChild(null, f);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Float left = null, right = null;
@@ -99,6 +109,11 @@
     
     @Override
     public Result getNext(Integer i) throws ExecException {
+        Result r = accumChild(null, i);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Integer left = null, right = null;
@@ -122,6 +137,11 @@
     
     @Override
     public Result getNext(Long l) throws ExecException {
+        Result r = accumChild(null, l);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Long left = null, right = null;

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java Thu Nov 12 18:33:15 2009
@@ -65,36 +65,60 @@
 
         switch (operandType) {
         case DataType.BYTEARRAY: {
+            Result r = accumChild(null, dummyDBA);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyDBA);
             right = rhs.getNext(dummyDBA);
             return doComparison(left, right);
                             }
 
         case DataType.DOUBLE: {
+            Result r = accumChild(null, dummyDouble);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyDouble);
             right = rhs.getNext(dummyDouble);
             return doComparison(left, right);
                             }
 
         case DataType.FLOAT: {
+            Result r = accumChild(null, dummyFloat);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyFloat);
             right = rhs.getNext(dummyFloat);
             return doComparison(left, right);
                             }
 
         case DataType.INTEGER: {
+            Result r = accumChild(null, dummyInt);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyInt);
             right = rhs.getNext(dummyInt);
             return doComparison(left, right);
                             }
 
         case DataType.LONG: {
+            Result r = accumChild(null, dummyLong);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyLong);
             right = rhs.getNext(dummyLong);
             return doComparison(left, right);
                             }
 
         case DataType.CHARARRAY: {
+            Result r = accumChild(null, dummyString);
+            if (r != null) {
+                return r;
+            }
             left = lhs.getNext(dummyString);
             right = rhs.getNext(dummyString);
             return doComparison(left, right);

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java Thu Nov 12 18:33:15 2009
@@ -58,6 +58,11 @@
 
     @Override
     public Result getNext(Boolean b) throws ExecException {
+        Result r = accumChild(null, dummyBool);
+        if (r != null) {
+            return r;
+        }
+        
         Result left;
         left = lhs.getNext(dummyBool);
         // pass on ERROR and EOP 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java Thu Nov 12 18:33:15 2009
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.pig.backend.executionengine.ExecException;
@@ -37,6 +39,7 @@
     ExpressionOperator cond;
     ExpressionOperator lhs;
     ExpressionOperator rhs;
+    private transient List<ExpressionOperator> child;
     
     public POBinCond(OperatorKey k) {
         super(k);
@@ -55,6 +58,11 @@
     
     @Override
     public Result getNext(Boolean b) throws ExecException {
+        Result r = accumChild(null, b);
+        if (r != null) {
+            return r;
+        }
+        
         Result res = cond.getNext(b);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
         return ((Boolean)res.result) == true ? lhs.getNext(b) : rhs.getNext(b);
@@ -63,6 +71,21 @@
 
     @Override
     public Result getNext(DataBag db) throws ExecException {
+        List<ExpressionOperator> l = new ArrayList<ExpressionOperator>();
+        l.add(cond);
+        Result r = accumChild(l, dummyBool);
+        
+        if (r != null) {    	
+            if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
+                return r;
+            }
+            l.clear();
+            l.add(lhs);
+            l.add(rhs);
+            r = accumChild(l, db);
+            return r;    		
+        }
+                        
         Result res = cond.getNext(dummyBool);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
         return ((Boolean)res.result) == true ? lhs.getNext(db) : rhs.getNext(db);
@@ -70,6 +93,20 @@
 
     @Override
     public Result getNext(DataByteArray ba) throws ExecException {
+        List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
+        list.add(cond);
+        Result r = accumChild(list, dummyBool);
+        
+        if (r != null) {    	
+            if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
+                return r;
+            }
+            list.clear();
+            list.add(lhs);
+            list.add(rhs);
+            r = accumChild(list, ba);
+            return r;    		
+        }
         Result res = cond.getNext(dummyBool);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
         return ((Boolean)res.result) == true ? lhs.getNext(ba) : rhs.getNext(ba);
@@ -77,6 +114,20 @@
 
     @Override
     public Result getNext(Double d) throws ExecException {
+        List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
+        list.add(cond);
+        Result r = accumChild(list, dummyBool);
+        
+        if (r != null) {    	
+            if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
+                return r;
+            }
+            list.clear();
+            list.add(lhs);
+            list.add(rhs);
+            r = accumChild(list, d);
+            return r;    		
+        }
         Result res = cond.getNext(dummyBool);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
         return ((Boolean)res.result) == true ? lhs.getNext(d) : rhs.getNext(d);
@@ -84,6 +135,20 @@
 
     @Override
     public Result getNext(Float f) throws ExecException {
+        List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
+        list.add(cond);
+        Result r = accumChild(list, dummyBool);
+        
+        if (r != null) {    	
+            if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
+                return r;
+            }
+            list.clear();
+            list.add(lhs);
+            list.add(rhs);
+            r = accumChild(list, f);
+            return r;    		
+        }
         Result res = cond.getNext(dummyBool);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
         return ((Boolean)res.result) == true ? lhs.getNext(f) : rhs.getNext(f);
@@ -91,6 +156,19 @@
 
     @Override
     public Result getNext(Integer i) throws ExecException {
+        List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
+        list.add(cond);
+        Result r = accumChild(list, dummyBool);    	
+        if (r != null) {    	
+            if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
+                return r;
+            }
+            list.clear();
+            list.add(lhs);
+            list.add(rhs);
+            r = accumChild(list, i);
+            return r;    		
+        }
         Result res = cond.getNext(dummyBool);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
         return ((Boolean)res.result) == true ? lhs.getNext(i) : rhs.getNext(i);
@@ -98,6 +176,20 @@
 
     @Override
     public Result getNext(Long l) throws ExecException {
+        List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
+        list.add(cond);
+        Result r = accumChild(list, dummyBool);
+        
+        if (r != null) {    	
+            if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
+                return r;
+            }
+            list.clear();
+            list.add(lhs);
+            list.add(rhs);
+            r = accumChild(list, l);
+            return r;    		
+        }
         Result res = cond.getNext(dummyBool);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
         return ((Boolean)res.result) == true ? lhs.getNext(l) : rhs.getNext(l);
@@ -105,6 +197,20 @@
 
     @Override
     public Result getNext(Map m) throws ExecException {
+        List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
+        list.add(cond);
+        Result r = accumChild(list, dummyBool);
+        
+        if (r != null) {    	
+            if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
+                return r;
+            }
+            list.clear();
+            list.add(lhs);
+            list.add(rhs);
+            r = accumChild(list, m);
+            return r;    		
+        }
         Result res = cond.getNext(dummyBool);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
         return ((Boolean)res.result) == true ? lhs.getNext(m) : rhs.getNext(m);
@@ -112,6 +218,20 @@
 
     @Override
     public Result getNext(String s) throws ExecException {
+        List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
+        list.add(cond);
+        Result r = accumChild(list, dummyBool);
+        
+        if (r != null) {    	
+            if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
+                return r;
+            }
+            list.clear();
+            list.add(lhs);
+            list.add(rhs);
+            r = accumChild(list, s);
+            return r;    		
+        }
         Result res = cond.getNext(dummyBool);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
         return ((Boolean)res.result) == true ? lhs.getNext(s) : rhs.getNext(s);
@@ -119,6 +239,20 @@
 
     @Override
     public Result getNext(Tuple t) throws ExecException {
+        List<ExpressionOperator> list = new ArrayList<ExpressionOperator>();
+        list.add(cond);
+        Result r = accumChild(list, dummyBool);
+        
+        if (r != null) {    	
+            if (r.returnStatus != POStatus.STATUS_BATCH_OK) {
+                return r;
+            }
+            list.clear();
+            list.add(lhs);
+            list.add(rhs);
+            r = accumChild(list, t);
+            return r;    		
+        }
         Result res = cond.getNext(dummyBool);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
         return ((Boolean)res.result) == true ? lhs.getNext(t) : rhs.getNext(t);
@@ -153,6 +287,27 @@
         this.lhs = lhs;
     }
 
+    /**
+     * Get condition
+     */
+    public ExpressionOperator getCond() {
+        return this.cond;
+    }
+    
+    /**
+     * Get right expression
+     */
+    public ExpressionOperator getRhs() {
+        return this.rhs;
+    }
+    
+    /**
+     * Get left expression
+     */
+    public ExpressionOperator getLhs() {
+        return this.lhs;
+    }
+    
     @Override
     public boolean supportsMultipleInputs() {
         return true;
@@ -169,4 +324,18 @@
         return clone;
     }
 
+    /**
+     * Get child expressions of this expression
+     */
+    @Override
+    public List<ExpressionOperator> getChildExpressions() {
+        if (child == null) {
+            child = new ArrayList<ExpressionOperator>();
+            child.add(cond);
+            child.add(lhs);
+            child.add(rhs);
+        }
+        return child;
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Thu Nov 12 18:33:15 2009
@@ -19,6 +19,8 @@
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -52,6 +54,7 @@
     transient private Log log = LogFactory.getLog(getClass());
     private boolean castNotNeeded = false;
     private Byte realType = null;
+    private transient List<ExpressionOperator> child;
 
     private static final long serialVersionUID = 1L;
 
@@ -328,7 +331,7 @@
             String str = null;
             Result res = in.getNext(str);
             if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-            	res.result = CastUtils.stringToLong((String)res.result);
+                res.result = CastUtils.stringToLong((String)res.result);
             }
             return res;
         }
@@ -449,7 +452,7 @@
             String str = null;
             Result res = in.getNext(str);
             if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-            	res.result = CastUtils.stringToDouble((String)res.result);
+                res.result = CastUtils.stringToDouble((String)res.result);
             }
             return res;
         }
@@ -572,7 +575,7 @@
             String str = null;
             Result res = in.getNext(str);
             if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-            	res.result = CastUtils.stringToFloat((String)res.result);
+                res.result = CastUtils.stringToFloat((String)res.result);
             }
             return res;
         }
@@ -695,7 +698,6 @@
         }
 
         case DataType.CHARARRAY: {
-
             Result res = in.getNext(str);
 
             return res;
@@ -988,4 +990,19 @@
         return clone;
     }
 
+    /**
+     * Get child expression of this expression
+     */
+    @Override
+    public List<ExpressionOperator> getChildExpressions() {
+        if (child == null) {
+            child = new ArrayList<ExpressionOperator>();
+            if (inputs.get(0) instanceof ExpressionOperator) {
+                child.add( (ExpressionOperator)inputs.get(0));		
+            }
+        }
+        
+        return child;				
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java Thu Nov 12 18:33:15 2009
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
 
+import java.util.List;
 import java.util.Map;
 
 import org.apache.pig.backend.executionengine.ExecException;
@@ -32,49 +33,49 @@
 import org.apache.pig.impl.plan.VisitorException;
 
 public class POMapLookUp extends ExpressionOperator {
-	
+    
     private static final long serialVersionUID = 1L;
     private String key;
 
-	public POMapLookUp(OperatorKey k) {
-		super(k);
-	}
-	
-	public POMapLookUp(OperatorKey k, int rp) {
-		super(k, rp);
-	}
-	
-	public POMapLookUp(OperatorKey k, int rp, String key) {
-		super(k, rp);
-		this.key = key;
-	}
-	
-	public void setLookUpKey(String key) {
-		this.key = key;
-	}
-	
-	public String getLookUpKey() {
-		return key;
-	}
-
-	@Override
-	public void visit(PhyPlanVisitor v) throws VisitorException {
-		v.visitMapLookUp(this);
-
-	}
-
-	@Override
-	public String name() {
-		return "POMapLookUp" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
-	}
-
-	@Override
-	public boolean supportsMultipleInputs() {
-		return false;
-	}
-	
-	@Override
-	public Result processInput() throws ExecException {
+    public POMapLookUp(OperatorKey k) {
+        super(k);
+    }
+    
+    public POMapLookUp(OperatorKey k, int rp) {
+        super(k, rp);
+    }
+    
+    public POMapLookUp(OperatorKey k, int rp, String key) {
+        super(k, rp);
+        this.key = key;
+    }
+    
+    public void setLookUpKey(String key) {
+        this.key = key;
+    }
+    
+    public String getLookUpKey() {
+        return key;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitMapLookUp(this);
+
+    }
+
+    @Override
+    public String name() {
+        return "POMapLookUp" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+    
+    @Override
+    public Result processInput() throws ExecException {
         Result res = new Result();
         Map<String, Object> inpValue = null;
         if (input == null && (inputs == null || inputs.size()==0)) {
@@ -91,65 +92,65 @@
             return res;
         }
     }
-	
-	@SuppressWarnings("unchecked")
-	private Result getNext() throws ExecException {
-		Result res = processInput();
-		if(res.result != null && res.returnStatus == POStatus.STATUS_OK) {
-			res.result = ((Map<String, Object>)res.result).get(key);
-		}
-		return res;
-	}
-
-	@Override
-	public Result getNext(Boolean b) throws ExecException {
-		return getNext();
-	}
-
-	@Override
-	public Result getNext(DataBag db) throws ExecException {
-		return getNext();
-	}
-
-	@Override
-	public Result getNext(DataByteArray ba) throws ExecException {
-		return getNext();
-	}
-
-	@Override
-	public Result getNext(Double d) throws ExecException {
-		return getNext();
-	}
-
-	@Override
-	public Result getNext(Float f) throws ExecException {
-		return getNext();
-	}
-
-	@Override
-	public Result getNext(Integer i) throws ExecException {
-		return getNext();
-	}
-
-	@Override
-	public Result getNext(Long l) throws ExecException {
-		return getNext();
-	}
-
-	@Override
-	public Result getNext(Map m) throws ExecException {
-		return getNext();
-	}
-
-	@Override
-	public Result getNext(String s) throws ExecException {
-		return getNext();
-	}
-
-	@Override
-	public Result getNext(Tuple t) throws ExecException {
-		return getNext();
-	}
+    
+    @SuppressWarnings("unchecked")
+    private Result getNext() throws ExecException {
+        Result res = processInput();
+        if(res.result != null && res.returnStatus == POStatus.STATUS_OK) {
+            res.result = ((Map<String, Object>)res.result).get(key);
+        }
+        return res;
+    }
+
+    @Override
+    public Result getNext(Boolean b) throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNext(DataBag db) throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNext(DataByteArray ba) throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNext(Double d) throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNext(Float f) throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNext(Integer i) throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNext(Long l) throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNext(Map m) throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNext(String s) throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        return getNext();
+    }
 
     @Override
     public POMapLookUp clone() throws CloneNotSupportedException {
@@ -158,7 +159,12 @@
         clone.cloneHelper(this);
         return clone;
     }
-	
-	
+
+    @Override
+    public List<ExpressionOperator> getChildExpressions() {		
+        return null;
+    }
+    
+    
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java Thu Nov 12 18:33:15 2009
@@ -60,7 +60,7 @@
     public Result getNext(Double d) throws ExecException {
         Result res = expr.getNext(d);
         if(res.returnStatus == POStatus.STATUS_OK && res.result!=null) {
-         		res.result = -1*((Double)res.result);
+                 res.result = -1*((Double)res.result);
 
        }
        

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java Thu Nov 12 18:33:15 2009
@@ -58,6 +58,11 @@
 
     @Override
     public Result getNext(Boolean b) throws ExecException {
+        Result r = accumChild(null, dummyBool);
+        if (r != null) {
+            return r;
+        }
+        
         Result left;
         left = lhs.getNext(dummyBool);
         // pass on ERROR and EOP

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Thu Nov 12 18:33:15 2009
@@ -19,6 +19,7 @@
 
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.pig.PigException;
@@ -52,12 +53,12 @@
      */
     private static final long serialVersionUID = 1L;
 
-	private static TupleFactory tupleFactory = TupleFactory.getInstance();
-	
-	protected static final BagFactory bagFactory = BagFactory.getInstance();
+    private static TupleFactory tupleFactory = TupleFactory.getInstance();
+    
+    protected static final BagFactory bagFactory = BagFactory.getInstance();
 
-	private boolean resultSingleTupleBag = false;
-	
+    private boolean resultSingleTupleBag = false;
+    
     //The column to project
     protected ArrayList<Integer> columns;
     
@@ -162,7 +163,7 @@
                 ret = null;
             }
         } else {
-	        ArrayList<Object> objList = new ArrayList<Object>(columns.size()); 
+            ArrayList<Object> objList = new ArrayList<Object>(columns.size()); 
                 
             for(int i: columns) {
                 try { 
@@ -175,7 +176,7 @@
                     objList.add(null);
                 }
             }
-		    ret = tupleFactory.newTuple(objList);
+            ret = tupleFactory.newTuple(objList);
         }
         res.result = ret;
         return res;
@@ -294,12 +295,12 @@
             if(columns.size() == 1) {
                 ret = inpValue.get(columns.get(0));
             } else {
-	            ArrayList<Object> objList = new ArrayList<Object>(columns.size()); 
+                ArrayList<Object> objList = new ArrayList<Object>(columns.size()); 
                 
                 for(int i: columns) {
                    objList.add(inpValue.get(i)); 
                 }
-		        ret = tupleFactory.newTuple(objList);
+                ret = tupleFactory.newTuple(objList);
                 res.result = (Tuple)ret;
                 return res;
             }
@@ -413,4 +414,9 @@
         this.resultSingleTupleBag = resultSingleTupleBag;
     }
 
+    @Override
+    public List<ExpressionOperator> getChildExpressions() {
+        return null;
+    }
+
 }