You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2010/02/11 23:12:43 UTC

svn commit: r909165 [1/6] - in /hadoop/pig/trunk: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/logical/ src/org/apache/pig/experimental/logical/expression/ src/org/apache/pig/experimental/logica...

Author: gates
Date: Thu Feb 11 22:12:36 2010
New Revision: 909165

URL: http://svn.apache.org/viewvc?rev=909165&view=rev
Log:
Checkin of Ankit's pig_1178.patch.

Added:
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/LogicalPlanMigrationVistor.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/CastExpression.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/ExpToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/GreaterThanEqualExpression.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/GreaterThanExpression.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LessThanEqualExpression.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LessThanExpression.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/OrExpression.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/UnaryExpression.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/AllExpressionVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/AllSameVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/LogicalPlanOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/ProjectionPatcher.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/SchemaPatcher.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/UidStamper.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOFilter.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOForEach.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOJoin.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOStore.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/MergeFilter.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/SplitFilter.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorSubPlan.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/SubtreeDependencyOrderWalker.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterAboveForeach.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterRule.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalListener.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogicalOptimizer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java
Modified:
    hadoop/pig/trunk/src/org/apache/pig/FuncSpec.java
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/AndExpression.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/BinaryExpression.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/ConstantExpression.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/EqualExpression.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpression.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpressionPlan.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpressionVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/ProjectExpression.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DepthFirstWalker.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/Operator.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorPlan.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanEdge.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/ReverseDependencyOrderWalker.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanTransformListener.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Rule.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Transformer.java
    hadoop/pig/trunk/src/org/apache/pig/impl/io/FileSpec.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/BinaryExpressionOperator.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalOperatorPlan.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalRule.java

Modified: hadoop/pig/trunk/src/org/apache/pig/FuncSpec.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/FuncSpec.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/FuncSpec.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/FuncSpec.java Thu Feb 11 22:12:36 2010
@@ -35,7 +35,7 @@
     String className = null;
     String[] ctorArgs = null;
     Schema inputArgsSchema = null;
-    
+   
     /**
      * @param className the name of the class for the udf
      * @param ctorArg the argument for the constructor for the above class
@@ -202,7 +202,34 @@
     public void setInputArgsSchema(Schema inputArgsSchema) {
         this.inputArgsSchema = inputArgsSchema;
     }
-
+    
+    @Override
+    public boolean equals(Object other) {
+        if (other != null && other instanceof FuncSpec) {
+            FuncSpec ofs = (FuncSpec)other;
+            if (!className.equals(ofs.className)) return false;
+            if (ctorArgs == null && ofs.ctorArgs != null ||
+                    ctorArgs != null && ofs.ctorArgs == null) {
+                return false;
+            }
+           
+            if (ctorArgs != null && ofs.ctorArgs != null) {
+                if (ctorArgs.length != ofs.ctorArgs.length) return false;
+                for (int i = 0; i < ctorArgs.length; i++) {
+                    if (!ctorArgs[i].equals(ofs.ctorArgs[i])) return false;
+                }
+            }
+            return Schema.equals(inputArgsSchema, ofs.inputArgsSchema, false, true);
+        } else {
+            return false;
+        }
+    }
+    
+    @Override
+    public int hashCode() {
+        return getClassName().hashCode() + ctorArgs.length;
+    }
+ 
     @Override
     public FuncSpec clone() throws CloneNotSupportedException {
         String[] args = null;

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Thu Feb 11 22:12:36 2010
@@ -382,7 +382,7 @@
      * @throws IOException
      */    
     public void registerQuery(String query, int startLine) throws IOException {            
-    	currDAG.registerQuery(query, startLine);
+        currDAG.registerQuery(query, startLine);
     }
  
     public LogicalPlan clonePlan(String alias) throws IOException {
@@ -828,11 +828,11 @@
         }
         
         if(aggregateWarning) {
-        	CompilationMessageCollector.logMessages(collector, MessageType.Warning, aggregateWarning, log);
+            CompilationMessageCollector.logMessages(collector, MessageType.Warning, aggregateWarning, log);
         } else {
-        	for(Enum type: MessageType.values()) {
-        		CompilationMessageCollector.logAllMessages(collector, log);
-        	}
+            for(Enum type: MessageType.values()) {
+                CompilationMessageCollector.logAllMessages(collector, log);
+            }
         }
         
         if (caught != null) {
@@ -840,7 +840,7 @@
         }
 
         // optimize
-        if (optimize) {
+        if (optimize && pigContext.getProperties().getProperty("pig.usenewlogicalplan", "false").equals("false")) {
             HashSet<String> optimizerRules = null;
             try {
                 optimizerRules = (HashSet<String>) ObjectSerializer
@@ -890,7 +890,7 @@
      * This class holds the internal states of a grunt shell session.
      */
     private class Graph {
-    	
+        
         private Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
         
         private Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, LogicalOperator>();

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Feb 11 22:12:36 2010
@@ -58,6 +58,8 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.experimental.logical.optimizer.UidStamper;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.tools.pigstats.PigStats;
 
@@ -253,12 +255,39 @@
         }
 
         try {
-            LogToPhyTranslationVisitor translator = 
-                new LogToPhyTranslationVisitor(plan);
-            translator.setPigContext(pigContext);
-            translator.visit();
-            return translator.getPhysicalPlan();
-        } catch (VisitorException ve) {
+            if (getConfiguration().getProperty("pig.usenewlogicalplan", "false").equals("true")) {
+                log.info("pig.usenewlogicalplan is set to true. New logical plan will be used.");
+                
+                // translate old logical plan to new plan
+                LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(plan);
+                visitor.visit();
+                org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
+                
+                // set uids
+                UidStamper stamper = new UidStamper(newPlan);
+                stamper.visit();
+                
+                // run optimizer
+                org.apache.pig.experimental.logical.optimizer.LogicalPlanOptimizer optimizer = 
+                    new org.apache.pig.experimental.logical.optimizer.LogicalPlanOptimizer(newPlan, 100);
+                optimizer.optimize();
+                
+                // translate new logical plan to physical plan
+                org.apache.pig.experimental.logical.relational.LogToPhyTranslationVisitor translator = 
+                    new org.apache.pig.experimental.logical.relational.LogToPhyTranslationVisitor(newPlan);
+                
+                translator.setPigContext(pigContext);
+                translator.visit();
+                return translator.getPhysicalPlan();
+                
+            }else{       
+                LogToPhyTranslationVisitor translator = 
+                    new LogToPhyTranslationVisitor(plan);
+                translator.setPigContext(pigContext);
+                translator.visit();
+                return translator.getPhysicalPlan();
+            }
+        } catch (Exception ve) {
             int errCode = 2042;
             String msg = "Internal error. Unable to translate logical plan to physical plan.";
             throw new ExecException(msg, errCode, PigException.BUG, ve);

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/LogicalPlanMigrationVistor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/LogicalPlanMigrationVistor.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/LogicalPlanMigrationVistor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/LogicalPlanMigrationVistor.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.expression.AndExpression;
+import org.apache.pig.experimental.logical.expression.CastExpression;
+import org.apache.pig.experimental.logical.expression.ConstantExpression;
+import org.apache.pig.experimental.logical.expression.EqualExpression;
+import org.apache.pig.experimental.logical.expression.GreaterThanEqualExpression;
+import org.apache.pig.experimental.logical.expression.GreaterThanExpression;
+import org.apache.pig.experimental.logical.expression.LessThanEqualExpression;
+import org.apache.pig.experimental.logical.expression.LessThanExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.OrExpression;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.relational.LOInnerLoad;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.ExpressionOperator;
+import org.apache.pig.impl.logicalLayer.LOAdd;
+import org.apache.pig.impl.logicalLayer.LOAnd;
+import org.apache.pig.impl.logicalLayer.LOBinCond;
+import org.apache.pig.impl.logicalLayer.LOCast;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOConst;
+import org.apache.pig.impl.logicalLayer.LOCross;
+import org.apache.pig.impl.logicalLayer.LODistinct;
+import org.apache.pig.impl.logicalLayer.LODivide;
+import org.apache.pig.impl.logicalLayer.LOEqual;
+import org.apache.pig.impl.logicalLayer.LOFilter;
+import org.apache.pig.impl.logicalLayer.LOForEach;
+import org.apache.pig.impl.logicalLayer.LOGenerate;
+import org.apache.pig.impl.logicalLayer.LOGreaterThan;
+import org.apache.pig.impl.logicalLayer.LOGreaterThanEqual;
+import org.apache.pig.impl.logicalLayer.LOIsNull;
+import org.apache.pig.impl.logicalLayer.LOJoin;
+import org.apache.pig.impl.logicalLayer.LOLesserThan;
+import org.apache.pig.impl.logicalLayer.LOLesserThanEqual;
+import org.apache.pig.impl.logicalLayer.LOLimit;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LOMapLookup;
+import org.apache.pig.impl.logicalLayer.LOMod;
+import org.apache.pig.impl.logicalLayer.LOMultiply;
+import org.apache.pig.impl.logicalLayer.LONegative;
+import org.apache.pig.impl.logicalLayer.LONot;
+import org.apache.pig.impl.logicalLayer.LONotEqual;
+import org.apache.pig.impl.logicalLayer.LOOr;
+import org.apache.pig.impl.logicalLayer.LOProject;
+import org.apache.pig.impl.logicalLayer.LORegexp;
+import org.apache.pig.impl.logicalLayer.LOSort;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LOStore;
+import org.apache.pig.impl.logicalLayer.LOStream;
+import org.apache.pig.impl.logicalLayer.LOSubtract;
+import org.apache.pig.impl.logicalLayer.LOUnion;
+import org.apache.pig.impl.logicalLayer.LOUserFunc;
+import org.apache.pig.impl.logicalLayer.LOVisitor;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.LOJoin.JOINTYPE;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
+
+public class LogicalPlanMigrationVistor extends LOVisitor { 
+    private org.apache.pig.experimental.logical.relational.LogicalPlan logicalPlan;
+    private HashMap<LogicalOperator, LogicalRelationalOperator> opsMap;
+   
+    public LogicalPlanMigrationVistor(LogicalPlan plan) {
+        super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
+        logicalPlan = new org.apache.pig.experimental.logical.relational.LogicalPlan();
+        opsMap = new HashMap<LogicalOperator, LogicalRelationalOperator>();
+    }    
+    
+    private LogicalSchema translateSchema(Schema schema) {    	
+        if (schema == null) {
+            return null;
+        }
+        
+        LogicalSchema s2 = new LogicalSchema();
+        List<Schema.FieldSchema> ll = schema.getFields();
+        for (Schema.FieldSchema f: ll) {
+            LogicalSchema.LogicalFieldSchema f2 = 
+                new LogicalSchema.LogicalFieldSchema(f.alias, translateSchema(f.schema), f.type);
+                       
+            s2.addField(f2);
+        }
+        
+        return s2;
+    }
+    
+    private void translateConnection(LogicalOperator oldOp, org.apache.pig.experimental.plan.Operator newOp) {       
+        List<LogicalOperator> preds = mPlan.getPredecessors(oldOp); 
+        
+        if(preds != null) {            
+            for(LogicalOperator pred: preds) {
+                org.apache.pig.experimental.plan.Operator newPred = opsMap.get(pred);
+                newOp.getPlan().connect(newPred, newOp);                 
+            }
+        }        
+    }      
+    
+    private LogicalExpressionPlan translateExpressionPlan(LogicalPlan lp) throws VisitorException {
+        PlanWalker<LogicalOperator, LogicalPlan> childWalker = 
+            new DependencyOrderWalker<LogicalOperator, LogicalPlan>(lp);
+        
+        LogicalExpPlanMigrationVistor childPlanVisitor = new LogicalExpPlanMigrationVistor(lp);
+        
+        childWalker.walk(childPlanVisitor);
+        return childPlanVisitor.exprPlan;
+    }
+      
+    public org.apache.pig.experimental.logical.relational.LogicalPlan getNewLogicalPlan() {
+        return logicalPlan;
+    }
+    
+    public void visit(LOCogroup cg) throws VisitorException {
+        throw new VisitorException("LOCogroup is not supported.");
+    }
+
+    public void visit(LOJoin loj) throws VisitorException {
+        // List of join predicates 
+        List<LogicalOperator> inputs = loj.getInputs();
+        
+        // mapping of inner plans for each input
+        MultiMap<Integer, LogicalExpressionPlan> joinPlans = 
+                        new MultiMap<Integer, LogicalExpressionPlan>();
+       
+        for (int i=0; i<inputs.size(); i++) {
+            List<LogicalPlan> plans = (List<LogicalPlan>) loj.getJoinPlans().get(inputs.get(i));
+            for (LogicalPlan lp : plans) {                               
+                joinPlans.put(i, translateExpressionPlan(lp));
+            }        
+        }
+        
+        JOINTYPE type = loj.getJoinType();
+        org.apache.pig.experimental.logical.relational.LOJoin.JOINTYPE newType = org.apache.pig.experimental.logical.relational.LOJoin.JOINTYPE.HASH;;
+        switch(type) {        
+        case REPLICATED:
+            newType = org.apache.pig.experimental.logical.relational.LOJoin.JOINTYPE.REPLICATED;
+            break;        	
+        case SKEWED:
+            newType = org.apache.pig.experimental.logical.relational.LOJoin.JOINTYPE.SKEWED;
+            break;
+        case MERGE:
+            newType = org.apache.pig.experimental.logical.relational.LOJoin.JOINTYPE.MERGE;
+            break;        
+        }
+        
+        boolean[] isInner = loj.getInnerFlags();
+        org.apache.pig.experimental.logical.relational.LOJoin join = 
+            new org.apache.pig.experimental.logical.relational.LOJoin(logicalPlan, joinPlans, newType, isInner);
+     
+        join.setAlias(loj.getAlias());
+        join.setRequestedParallelism(loj.getRequestedParallelism());
+        
+        logicalPlan.add(join);
+        opsMap.put(loj, join);       
+        translateConnection(loj, join);           
+    }
+
+    public void visit(LOForEach forEach) throws VisitorException {
+        
+        org.apache.pig.experimental.logical.relational.LOForEach newForeach = 
+                new org.apache.pig.experimental.logical.relational.LOForEach(logicalPlan);
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan innerPlan = 
+            new org.apache.pig.experimental.logical.relational.LogicalPlan();
+        
+        newForeach.setInnerPlan(innerPlan);
+        
+        List<LogicalExpressionPlan> expPlans = new ArrayList<LogicalExpressionPlan>();
+        
+        List<Boolean> fl = forEach.getFlatten();
+        boolean[] flat = new boolean[fl.size()];
+        for(int i=0; i<fl.size(); i++) {
+            flat[i] = fl.get(i);
+        }
+        org.apache.pig.experimental.logical.relational.LOGenerate gen = 
+            new org.apache.pig.experimental.logical.relational.LOGenerate(innerPlan, expPlans, flat);
+        
+        innerPlan.add(gen);                
+        
+        List<LogicalPlan> ll = forEach.getForEachPlans();
+        for(int i=0; i<ll.size(); i++) {
+            LogicalPlan lp = ll.get(i);
+            ForeachInnerPlanVisitor v = new ForeachInnerPlanVisitor(newForeach, forEach, lp);
+            v.visit();
+                        
+            innerPlan.connect(v.lastOp, gen);                       
+            expPlans.add(v.exprPlan);
+        }
+        
+        newForeach.setAlias(forEach.getAlias());
+        newForeach.setRequestedParallelism(forEach.getRequestedParallelism());
+        
+        logicalPlan.add(newForeach);
+        opsMap.put(forEach, newForeach);       
+        translateConnection(forEach, newForeach);     
+    }
+
+    public void visit(LOSort s) throws VisitorException {
+        throw new VisitorException("LOSort is not supported.");
+    }
+
+    public void visit(LOLimit limOp) throws VisitorException {
+        throw new VisitorException("LOLimit is not supported.");
+    }
+    
+    public void visit(LOStream stream) throws VisitorException {
+        throw new VisitorException("LOStream is not supported.");
+    }
+    
+    public void visit(LOFilter filter) throws VisitorException {
+        org.apache.pig.experimental.logical.relational.LOFilter newFilter = new org.apache.pig.experimental.logical.relational.LOFilter(logicalPlan);
+        
+        LogicalPlan filterPlan = filter.getComparisonPlan();
+        LogicalExpressionPlan newFilterPlan = translateExpressionPlan(filterPlan);
+      
+        newFilter.setFilterPlan(newFilterPlan);
+        newFilter.setAlias(filter.getAlias());
+        newFilter.setRequestedParallelism(filter.getRequestedParallelism());
+        
+        logicalPlan.add(newFilter);
+        opsMap.put(filter, newFilter);       
+        translateConnection(filter, newFilter);
+    }
+
+    public void visit(LOSplit split) throws VisitorException {
+        throw new VisitorException("LOSplit is not supported.");
+    }
+
+
+    public void visit(LOGenerate g) throws VisitorException {
+        throw new VisitorException("LOGenerate is not supported.");
+    }
+    
+    public void visit(LOLoad load) throws VisitorException{      
+        FileSpec fs = load.getInputFile();
+        
+        LogicalSchema s = null;
+        try {
+            s = translateSchema(load.getSchema());
+        }catch(Exception e) {
+            throw new VisitorException("Failed to translate schema.", e);
+        }
+        
+        org.apache.pig.experimental.logical.relational.LOLoad ld = 
+            new org.apache.pig.experimental.logical.relational.LOLoad(fs, s, logicalPlan);
+        
+        ld.setAlias(load.getAlias());
+        ld.setRequestedParallelism(load.getRequestedParallelism());
+        
+        logicalPlan.add(ld);        
+        opsMap.put(load, ld);
+        translateConnection(load, ld);
+    }
+    
+
+    public void visit(LOStore store) throws VisitorException{
+        org.apache.pig.experimental.logical.relational.LOStore newStore = 
+                new org.apache.pig.experimental.logical.relational.LOStore(logicalPlan, store.getOutputFile());    	
+       
+        newStore.setAlias(store.getAlias());
+        newStore.setRequestedParallelism(store.getRequestedParallelism());
+        
+        logicalPlan.add(newStore);
+        opsMap.put(store, newStore);       
+        translateConnection(store, newStore);
+    }    
+
+    public void visit(LOUnion u) throws VisitorException {
+        throw new VisitorException("LOUnion is not supported.");
+    }
+
+    public void visit(LOSplitOutput sop) throws VisitorException {
+        throw new VisitorException("LOSplitOutput is not supported.");
+    }
+
+    public void visit(LODistinct dt) throws VisitorException {
+        throw new VisitorException("LODistinct is not supported.");
+    }
+
+    public void visit(LOCross cs) throws VisitorException {
+        throw new VisitorException("LOCross is not supported.");
+    }
+    
+    public class LogicalExpPlanMigrationVistor extends LOVisitor { 
+        
+        protected org.apache.pig.experimental.logical.expression.LogicalExpressionPlan exprPlan;
+        protected HashMap<LogicalOperator, LogicalExpression> exprOpsMap;
+        protected LogicalPlan oldLogicalPlan;
+        
+        public LogicalExpPlanMigrationVistor(LogicalPlan plan) {
+            super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
+            exprPlan = new org.apache.pig.experimental.logical.expression.LogicalExpressionPlan();
+            exprOpsMap = new HashMap<LogicalOperator, LogicalExpression>();
+            
+            oldLogicalPlan = LogicalPlanMigrationVistor.this.mPlan;
+        }    
+
+        private void translateConnection(LogicalOperator oldOp, org.apache.pig.experimental.plan.Operator newOp) {       
+           List<LogicalOperator> preds = mPlan.getPredecessors(oldOp); 
+           
+           // the dependency relationship of new expression plan is opposite to the old logical plan
+           // for example, a+b, in old plan, "add" is a leave, and "a" and "b" are roots
+           // in new plan, "add" is root, and "a" and "b" are leaves.
+           if(preds != null) {            
+               for(LogicalOperator pred: preds) {
+                   org.apache.pig.experimental.plan.Operator newPred = exprOpsMap.get(pred);
+                   newOp.getPlan().connect(newOp, newPred);                 
+               }
+           }        
+       }
+        
+        public void visit(LOProject project) throws VisitorException {
+            int col = project.getCol();
+            LogicalOperator lg = project.getExpression();
+            LogicalOperator succed = oldLogicalPlan.getSuccessors(lg).get(0);
+            int input = oldLogicalPlan.getPredecessors(succed).indexOf(lg);
+                        
+            // get data type of projection
+            byte t = DataType.BYTEARRAY;
+            try {
+                Schema s = lg.getSchema();
+                if (s != null) {
+                    t = s.getField(col).type;
+                }
+            }catch(Exception e) {
+                throw new VisitorException(e);
+            }
+            ProjectExpression pe = new ProjectExpression(exprPlan, t, input, col);          
+            
+            exprPlan.add(pe);
+            exprOpsMap.put(project, pe);       
+            translateConnection(project, pe);                       
+        }
+        
+        public void visit(LOConst con) throws VisitorException{
+
+            ConstantExpression ce = new ConstantExpression(exprPlan, con.getType(), con.getValue());
+            
+             exprPlan.add(ce);
+             exprOpsMap.put(con, ce);       
+             translateConnection(con, ce);
+        }
+        
+        public void visit(LOGreaterThan op) throws VisitorException {
+            ExpressionOperator left = op.getLhsOperand();
+            ExpressionOperator right = op.getRhsOperand();
+                    
+            GreaterThanExpression eq = new GreaterThanExpression
+            (exprPlan, exprOpsMap.get(left), exprOpsMap.get(right));
+            exprOpsMap.put(op, eq);
+        }
+
+        public void visit(LOLesserThan op) throws VisitorException {
+            ExpressionOperator left = op.getLhsOperand();
+            ExpressionOperator right = op.getRhsOperand();
+                    
+            LessThanExpression eq = new LessThanExpression
+            (exprPlan, exprOpsMap.get(left), exprOpsMap.get(right));
+            exprOpsMap.put(op, eq);
+        }
+
+        public void visit(LOGreaterThanEqual op) throws VisitorException {
+            ExpressionOperator left = op.getLhsOperand();
+            ExpressionOperator right = op.getRhsOperand();
+                    
+            GreaterThanEqualExpression eq = new GreaterThanEqualExpression
+            (exprPlan, exprOpsMap.get(left), exprOpsMap.get(right));
+            exprOpsMap.put(op, eq);
+        }
+
+        public void visit(LOLesserThanEqual op) throws VisitorException {
+            ExpressionOperator left = op.getLhsOperand();
+            ExpressionOperator right = op.getRhsOperand();
+                    
+            LessThanEqualExpression eq = new LessThanEqualExpression
+            (exprPlan, exprOpsMap.get(left), exprOpsMap.get(right));
+            exprOpsMap.put(op, eq);        }
+
+        public void visit(LOEqual op) throws VisitorException {		
+            ExpressionOperator left = op.getLhsOperand();
+            ExpressionOperator right = op.getRhsOperand();
+                    
+            EqualExpression eq = new EqualExpression(exprPlan, exprOpsMap.get(left), exprOpsMap.get(right));
+            exprOpsMap.put(op, eq);
+        }
+
+        public void visit(LOUserFunc func) throws VisitorException {
+            throw new VisitorException("LOUserFunc is not supported.");
+        }
+
+        public void visit(LOBinCond binCond) throws VisitorException {
+            throw new VisitorException("LOBinCond is not supported.");
+        }
+
+        public void visit(LOCast cast) throws VisitorException {
+            byte b = cast.getType();
+            ExpressionOperator exp = cast.getExpression();
+            
+            CastExpression c = new CastExpression(exprPlan, b, exprOpsMap.get(exp));
+            c.setFuncSpec(cast.getLoadFuncSpec());
+            exprOpsMap.put(cast, c);
+        }
+        
+        public void visit(LORegexp regexp) throws VisitorException {
+            throw new VisitorException("LORegexp is not supported.");
+        }
+
+        public void visit(LONotEqual op) throws VisitorException {
+            throw new VisitorException("LONotEqual is not supported.");
+        }
+
+        public void visit(LOAdd op) throws VisitorException {		
+            throw new VisitorException("LOAdd is not supported.");
+        }
+
+        public void visit(LOSubtract op) throws VisitorException {
+            throw new VisitorException("LOSubtract is not supported.");
+        }
+
+        public void visit(LOMultiply op) throws VisitorException {
+            throw new VisitorException("LOMultiply is not supported.");
+        }
+
+        public void visit(LODivide op) throws VisitorException {
+            throw new VisitorException("LODivide is not supported.");
+        }
+
+        public void visit(LOMod op) throws VisitorException {
+            throw new VisitorException("LOMod is not supported.");
+        }
+
+        
+        public void visit(LONegative op) throws VisitorException {
+            throw new VisitorException("LONegative is not supported.");
+        }
+
+        public void visit(LOMapLookup op) throws VisitorException {
+            throw new VisitorException("LOMapLookup is not supported.");
+        }
+
+        public void visit(LOAnd binOp) throws VisitorException {
+            ExpressionOperator left = binOp.getLhsOperand();
+            ExpressionOperator right = binOp.getRhsOperand();
+                    
+            AndExpression ae = new AndExpression(exprPlan, exprOpsMap.get(left), exprOpsMap.get(right));
+            exprOpsMap.put(binOp, ae);            
+        }
+
+        public void visit(LOOr binOp) throws VisitorException {
+            ExpressionOperator left = binOp.getLhsOperand();
+            ExpressionOperator right = binOp.getRhsOperand();
+                    
+            OrExpression ae = new OrExpression(exprPlan, exprOpsMap.get(left), exprOpsMap.get(right));
+            exprOpsMap.put(binOp, ae);
+        }
+
+        public void visit(LONot uniOp) throws VisitorException {
+            throw new VisitorException("LONot is not supported.");
+        }
+
+        public void visit(LOIsNull uniOp) throws VisitorException {
+            throw new VisitorException("LOIsNull is not supported.");
+        }
+    }
+    
+    // visitor to translate the inner plan of foreach
+    // it has all the operators allowed in the inner plan of foreach
+    public class ForeachInnerPlanVisitor extends LogicalExpPlanMigrationVistor {
+        private org.apache.pig.experimental.logical.relational.LOForEach foreach;
+        private LOForEach oldForeach;
+        private org.apache.pig.experimental.logical.relational.LOGenerate gen;
+        private org.apache.pig.experimental.logical.relational.LogicalPlan newInnerPlan;
+               
+        private HashMap<LogicalOperator, LogicalRelationalOperator> innerOpsMap;
+        private LogicalRelationalOperator lastOp;
+
+        public ForeachInnerPlanVisitor(org.apache.pig.experimental.logical.relational.LOForEach foreach, LOForEach oldForeach, LogicalPlan plan) {
+            super(plan);	
+            this.foreach = foreach;
+            newInnerPlan = foreach.getInnerPlan();
+            gen = (org.apache.pig.experimental.logical.relational.LOGenerate)newInnerPlan.getSinks().get(0);
+            this.oldForeach = oldForeach;
+                        
+            innerOpsMap = new HashMap<LogicalOperator, LogicalRelationalOperator>();
+        }      
+        
+        public void visit(LOProject project) throws VisitorException {
+            LogicalOperator op = project.getExpression();
+            
+            if (op == oldLogicalPlan.getPredecessors(oldForeach).get(0)) {
+                // if this projection is to get a field from outer plan, change it
+                // to LOInnerLoad
+                
+                LOInnerLoad innerLoad = new LOInnerLoad(newInnerPlan, foreach, project.getCol());               
+                newInnerPlan.add(innerLoad);
+                
+                List<LogicalOperator> ll = mPlan.getSuccessors(project);
+                if (ll == null || ll.get(0) instanceof ExpressionOperator || project.isStar()) {  
+                    int size = 0;
+                    try {
+                        List<org.apache.pig.experimental.plan.Operator> suc = newInnerPlan.getPredecessors(gen);
+                        if (suc != null) {
+                            size = suc.size();
+                        }
+                    }catch(Exception e) {
+                        throw new VisitorException(e);
+                    }
+                     
+                    lastOp = innerLoad;
+                    
+                    ProjectExpression pe = new ProjectExpression(exprPlan, project.getType(), size, 0);                              
+                    exprPlan.add(pe);
+                    exprOpsMap.put(project, pe);       
+                    translateConnection(project, pe);            
+                } else {
+                    innerOpsMap.put(project, innerLoad);
+                }
+            } else {
+                super.visit(project);
+            }
+        }       
+        
+        public void visit(LOForEach foreach) throws VisitorException {
+            throw new VisitorException("LOForEach is not supported as inner plan of foreach");
+        }
+        
+        public void visit(LOSort s) throws VisitorException {
+            throw new VisitorException("LOSort is not supported as inner plan of foreach.");
+        }
+
+        public void visit(LOLimit limOp) throws VisitorException {
+            throw new VisitorException("LOLimit is not supported as inner plan of foreach.");
+        }
+        
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/AndExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/AndExpression.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/AndExpression.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/AndExpression.java Thu Feb 11 22:12:36 2010
@@ -18,7 +18,10 @@
 
 package org.apache.pig.experimental.logical.expression;
 
+import java.io.IOException;
+
 import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.plan.Operator;
 import org.apache.pig.experimental.plan.OperatorPlan;
 import org.apache.pig.experimental.plan.PlanVisitor;
 
@@ -44,11 +47,24 @@
      * @link org.apache.pig.experimental.plan.Operator#accept(org.apache.pig.experimental.plan.PlanVisitor)
      */
     @Override
-    public void accept(PlanVisitor v) {
+    public void accept(PlanVisitor v) throws IOException {
         if (!(v instanceof LogicalExpressionVisitor)) {
-            throw new RuntimeException("Expected LogicalExpressionVisitor");
+            throw new IOException("Expected LogicalExpressionVisitor");
         }
         ((LogicalExpressionVisitor)v).visitAnd(this);
     }
-
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof AndExpression) {
+            AndExpression ao = (AndExpression)other;
+            try {
+                return ao.getLhs().isEqual(getLhs()) && ao.getRhs().isEqual(getRhs());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            return false;
+        }
+    }    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/BinaryExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/BinaryExpression.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/BinaryExpression.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/BinaryExpression.java Thu Feb 11 22:12:36 2010
@@ -54,8 +54,7 @@
      * @throws IOException 
      */
     public LogicalExpression getLhs() throws IOException {
-        return (LogicalExpression)plan.getSuccessors(this).get(0);
-        
+        return (LogicalExpression)plan.getSuccessors(this).get(0);        
     }
 
     /**
@@ -67,5 +66,4 @@
         return (LogicalExpression)plan.getSuccessors(this).get(1);
         
     }
-
 }

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/CastExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/CastExpression.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/CastExpression.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/CastExpression.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.expression;
+
+import java.io.IOException;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+public class CastExpression extends UnaryExpression {
+    private FuncSpec castFunc;
+
+    public CastExpression(OperatorPlan plan, byte b, LogicalExpression exp) {
+        super("Cast", plan, b, exp);		
+    }
+
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalExpressionVisitor)) {
+            throw new IOException("Expected LogicalExpressionVisitor");
+        }
+        ((LogicalExpressionVisitor)v).visitCast(this);
+    }
+
+    /**
+     * Set the <code>FuncSpec</code> that performs the casting functionality
+     * @param spec the <code>FuncSpec</code> that does the casting
+     */
+    public void setFuncSpec(FuncSpec spec) {
+        castFunc = spec;
+    }
+    
+    /**
+     * Get the <code>FuncSpec</code> that performs the casting functionality
+     * @return the <code>FuncSpec</code> that does the casting
+     */
+    public FuncSpec getFuncSpec() {
+        return castFunc;
+    }
+
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof CastExpression) { 
+            CastExpression of = (CastExpression)other;
+            try {
+                return plan.isEqual(of.plan) && getExpression().isEqual( of.getExpression() );
+            } catch (IOException e) {
+                return false;
+            }
+        } else {
+            return false;
+        }
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/ConstantExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/ConstantExpression.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/ConstantExpression.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/ConstantExpression.java Thu Feb 11 22:12:36 2010
@@ -18,6 +18,9 @@
 
 package org.apache.pig.experimental.logical.expression;
 
+import java.io.IOException;
+
+import org.apache.pig.experimental.plan.Operator;
 import org.apache.pig.experimental.plan.OperatorPlan;
 import org.apache.pig.experimental.plan.PlanVisitor;
 
@@ -48,9 +51,9 @@
      * @link org.apache.pig.experimental.plan.Operator#accept(org.apache.pig.experimental.plan.PlanVisitor)
      */
     @Override
-    public void accept(PlanVisitor v) {
+    public void accept(PlanVisitor v) throws IOException {
         if (!(v instanceof LogicalExpressionVisitor)) {
-            throw new RuntimeException("Expected LogicalExpressionVisitor");
+            throw new IOException("Expected LogicalExpressionVisitor");
         }
         ((LogicalExpressionVisitor)v).visitConstant(this);
 
@@ -63,5 +66,15 @@
     public Object getValue() {
         return val;
     }
-
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof ConstantExpression) {
+            ConstantExpression co = (ConstantExpression)other;
+            return co.type == type && ( ( co.val == null && val == null ) 
+                    || ( co != null && co.val.equals(val) ) );
+        } else {
+            return false;
+        }
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/EqualExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/EqualExpression.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/EqualExpression.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/EqualExpression.java Thu Feb 11 22:12:36 2010
@@ -18,7 +18,10 @@
 
 package org.apache.pig.experimental.logical.expression;
 
+import java.io.IOException;
+
 import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.plan.Operator;
 import org.apache.pig.experimental.plan.OperatorPlan;
 import org.apache.pig.experimental.plan.PlanVisitor;
 
@@ -44,12 +47,26 @@
      * @link org.apache.pig.experimental.plan.Operator#accept(org.apache.pig.experimental.plan.PlanVisitor)
      */
     @Override
-    public void accept(PlanVisitor v) {
+    public void accept(PlanVisitor v) throws IOException {
         if (!(v instanceof LogicalExpressionVisitor)) {
-            throw new RuntimeException(
-                "Expected LogicalExpressionVisitor");
+            throw new IOException("Expected LogicalExpressionVisitor");
         }
         ((LogicalExpressionVisitor)v).visitEqual(this);
     }
-
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof EqualExpression) {
+            EqualExpression eo = (EqualExpression)other;
+            try {
+                return eo.getLhs().isEqual(
+                        getLhs()) && 
+                eo.getRhs().isEqual(getRhs());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            return false;
+        }
+     }
 }

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/ExpToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/ExpToPhyTranslationVisitor.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/ExpToPhyTranslationVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/ExpToPhyTranslationVisitor.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.expression;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryComparisonOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GreaterThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POAnd;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POOr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.plan.DependencyOrderWalker;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanWalker;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+
+public class ExpToPhyTranslationVisitor extends LogicalExpressionVisitor {
+
+    // This value points to the current LogicalRelationalOperator we are working on
+    protected LogicalRelationalOperator currentOp;
+    
+    public ExpToPhyTranslationVisitor(OperatorPlan plan, LogicalRelationalOperator op, PhysicalPlan phyPlan, Map<Operator, PhysicalOperator> map) {
+        super(plan, new DependencyOrderWalker(plan));
+        currentOp = op;
+        logToPhyMap = map;
+        currentPlan = phyPlan;
+        currentPlans = new Stack<PhysicalPlan>();
+    }
+    
+    public ExpToPhyTranslationVisitor(OperatorPlan plan, PlanWalker walker, LogicalRelationalOperator op, PhysicalPlan phyPlan, Map<Operator, PhysicalOperator> map) {
+        super(plan, walker);
+        currentOp = op;
+        logToPhyMap = map;
+        currentPlan = phyPlan;
+        currentPlans = new Stack<PhysicalPlan>();
+    }
+    
+    protected Map<Operator, PhysicalOperator> logToPhyMap;
+
+    protected Stack<PhysicalPlan> currentPlans;
+
+    protected PhysicalPlan currentPlan;
+
+    protected NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator();
+
+    protected PigContext pc;
+    
+    public void setPigContext(PigContext pc) {
+        this.pc = pc;
+    }
+
+    public PhysicalPlan getPhysicalPlan() {
+        return currentPlan;
+    }
+
+    @Override
+    public void visitAnd( AndExpression op ) throws IOException {
+        String scope = DEFAULT_SCOPE;
+//        System.err.println("Entering And");
+        BinaryComparisonOperator exprOp = new POAnd(new OperatorKey(scope, nodeGen.getNextNodeId(scope)));
+        // We dont have aliases in ExpressionOperators
+        // exprOp.setAlias(op.getAlias());
+        exprOp.setLhs((ExpressionOperator)logToPhyMap.get(op.getLhs()));
+        exprOp.setRhs((ExpressionOperator)logToPhyMap.get(op.getRhs()));
+        OperatorPlan oPlan = op.getPlan();
+        
+        currentPlan.add(exprOp);
+        logToPhyMap.put(op, exprOp);
+        
+        List<Operator> successors = oPlan.getSuccessors(op);
+        if(successors == null) return;
+        for(Operator lo : successors) {
+            PhysicalOperator from = logToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+            }
+        }
+//        System.err.println("Exiting And");
+    }
+    
+    @Override
+    public void visitOr( OrExpression op ) throws IOException {
+        String scope = DEFAULT_SCOPE;
+//        System.err.println("Entering Or");
+        BinaryComparisonOperator exprOp = new POOr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)));
+        // We dont have aliases in ExpressionOperators
+        // exprOp.setAlias(op.getAlias());
+        exprOp.setLhs((ExpressionOperator)logToPhyMap.get(op.getLhs()));
+        exprOp.setRhs((ExpressionOperator)logToPhyMap.get(op.getRhs()));
+        OperatorPlan oPlan = op.getPlan();
+        
+        currentPlan.add(exprOp);
+        logToPhyMap.put(op, exprOp);
+        
+        List<Operator> successors = oPlan.getSuccessors(op);
+        if(successors == null) return;
+        for(Operator lo : successors) {
+            PhysicalOperator from = logToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+            }
+        }
+//        System.err.println("Exiting Or");
+    }
+    
+    @Override
+    public void visitEqual( EqualExpression op ) throws IOException {
+        String scope = DEFAULT_SCOPE;
+        BinaryComparisonOperator exprOp = new EqualToExpr(new OperatorKey(
+                scope, nodeGen.getNextNodeId(scope)));
+        // We dont have aliases in ExpressionOperators
+        // exprOp.setAlias(op.getAlias());
+        exprOp.setOperandType(op.getLhs().getType());
+        exprOp.setLhs((ExpressionOperator) logToPhyMap.get(op.getLhs()));
+        exprOp.setRhs((ExpressionOperator) logToPhyMap.get(op.getRhs()));
+        OperatorPlan oPlan = op.getPlan();
+        
+        currentPlan.add(exprOp);
+        logToPhyMap.put(op, exprOp);
+
+        List<Operator> successors = oPlan.getSuccessors(op);
+        if (successors == null) {
+            return;
+        }
+        for (Operator lo : successors) {
+            PhysicalOperator from = logToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+            }
+        }
+    }
+    
+    @Override
+    public void visitGreaterThan( GreaterThanExpression op ) throws IOException {
+        String scope = DEFAULT_SCOPE;
+        BinaryComparisonOperator exprOp = new GreaterThanExpr(new OperatorKey(
+                scope, nodeGen.getNextNodeId(scope)));
+        // We dont have aliases in ExpressionOperators
+        // exprOp.setAlias(op.getAlias());
+        exprOp.setOperandType(op.getLhs().getType());
+        exprOp.setLhs((ExpressionOperator) logToPhyMap.get(op.getLhs()));
+        exprOp.setRhs((ExpressionOperator) logToPhyMap.get(op.getRhs()));
+        OperatorPlan oPlan = op.getPlan();
+        
+        currentPlan.add(exprOp);
+        logToPhyMap.put(op, exprOp);
+
+        List<Operator> successors = oPlan.getSuccessors(op);
+        if (successors == null) {
+            return;
+        }
+        for (Operator lo : successors) {
+            PhysicalOperator from = logToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+            }
+        }
+    }
+    
+    @Override
+    public void visitGreaterThanEqual( GreaterThanEqualExpression op ) throws IOException {
+        String scope = DEFAULT_SCOPE;
+        BinaryComparisonOperator exprOp = new LessThanExpr(new OperatorKey(
+                scope, nodeGen.getNextNodeId(scope)));
+        // We dont have aliases in ExpressionOperators
+        // exprOp.setAlias(op.getAlias());
+        exprOp.setOperandType(op.getLhs().getType());
+        exprOp.setLhs((ExpressionOperator) logToPhyMap.get(op.getLhs()));
+        exprOp.setRhs((ExpressionOperator) logToPhyMap.get(op.getRhs()));
+        OperatorPlan oPlan = op.getPlan();
+        
+        currentPlan.add(exprOp);
+        logToPhyMap.put(op, exprOp);
+
+        List<Operator> successors = oPlan.getSuccessors(op);
+        if (successors == null) {
+            return;
+        }
+        for (Operator lo : successors) {
+            PhysicalOperator from = logToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+            }
+        }
+    }
+    
+    @Override
+    public void visitLessThan( LessThanExpression op ) throws IOException {
+        String scope = DEFAULT_SCOPE;
+        BinaryComparisonOperator exprOp = new LessThanExpr(new OperatorKey(
+                scope, nodeGen.getNextNodeId(scope)));
+        // We dont have aliases in ExpressionOperators
+        // exprOp.setAlias(op.getAlias());
+        exprOp.setOperandType(op.getLhs().getType());
+        exprOp.setLhs((ExpressionOperator) logToPhyMap.get(op.getLhs()));
+        exprOp.setRhs((ExpressionOperator) logToPhyMap.get(op.getRhs()));
+        OperatorPlan oPlan = op.getPlan();
+        
+        currentPlan.add(exprOp);
+        logToPhyMap.put(op, exprOp);
+
+        List<Operator> successors = oPlan.getSuccessors(op);
+        if (successors == null) {
+            return;
+        }
+        for (Operator lo : successors) {
+            PhysicalOperator from = logToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+            }
+        }
+    }
+    
+    
+    @Override
+    public void visitLessThanEqual( LessThanEqualExpression op ) throws IOException {
+        String scope = DEFAULT_SCOPE;
+        BinaryComparisonOperator exprOp = new LessThanExpr(new OperatorKey(
+                scope, nodeGen.getNextNodeId(scope)));
+        // We dont have aliases in ExpressionOperators
+        // exprOp.setAlias(op.getAlias());
+        exprOp.setOperandType(op.getLhs().getType());
+        exprOp.setLhs((ExpressionOperator) logToPhyMap.get(op.getLhs()));
+        exprOp.setRhs((ExpressionOperator) logToPhyMap.get(op.getRhs()));
+        OperatorPlan oPlan = op.getPlan();
+        
+        currentPlan.add(exprOp);
+        logToPhyMap.put(op, exprOp);
+
+        List<Operator> successors = oPlan.getSuccessors(op);
+        if (successors == null) {
+            return;
+        }
+        for (Operator lo : successors) {
+            PhysicalOperator from = logToPhyMap.get(lo);
+            try {
+                currentPlan.connect(from, exprOp);
+            } catch (PlanException e) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+            }
+        }
+    }
+    
+    @Override
+    public void visitProject(ProjectExpression op) throws IOException {
+        String scope = DEFAULT_SCOPE;
+//        System.err.println("Entering Project");
+        POProject exprOp;
+       
+        if(op.getType() == DataType.BAG) {
+            exprOp = new PORelationToExprProject(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)));
+         } else {
+            exprOp = new POProject(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)));
+        }
+        // We dont have aliases in ExpressionOperators
+        // exprOp.setAlias(op.getAlias());
+        exprOp.setResultType(op.getType());
+        exprOp.setColumn(op.getColNum());
+        // TODO implement this
+//        exprOp.setStar(op.isStar());
+//        exprOp.setOverloaded(op.getOverloaded());
+        logToPhyMap.put(op, exprOp);
+        currentPlan.add(exprOp);
+        
+        // We only have one input so connection is required from only one predecessor
+//        PhysicalOperator from = logToPhyMap.get(op.findReferent(currentOp));
+//        currentPlan.connect(from, exprOp);
+        
+//        List<Operator> predecessors = lp.getPredecessors(op);
+//
+//        // Project might not have any predecessors
+//        if (predecessors == null)
+//            return;
+//
+//        for (Operator lo : predecessors) {
+//            PhysicalOperator from = logToPhyMap.get(lo);
+//            try {
+//                currentPlan.connect(from, exprOp);
+//            } catch (PlanException e) {
+//                int errCode = 2015;
+//                String msg = "Invalid physical operators in the physical plan" ;
+//                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+//            }
+//        }
+//        System.err.println("Exiting Project");
+    }
+    
+    @Override
+    public void visitConstant(org.apache.pig.experimental.logical.expression.ConstantExpression op) throws IOException {
+        String scope = DEFAULT_SCOPE;
+//        System.err.println("Entering Constant");
+        ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,
+                nodeGen.getNextNodeId(scope)));
+        // We dont have aliases in ExpressionOperators
+        // ce.setAlias(op.getAlias());
+        ce.setValue(op.getValue());
+        ce.setResultType(op.getType());
+        //this operator doesn't have any predecessors
+        currentPlan.add(ce);
+        logToPhyMap.put(op, ce);
+//        System.err.println("Exiting Constant");
+    }
+    
+    @Override
+    public void visitCast( CastExpression op ) throws IOException {
+        String scope = DEFAULT_SCOPE;
+        POCast pCast = new POCast(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)));
+//        physOp.setAlias(op.getAlias());
+        currentPlan.add(pCast);
+
+        logToPhyMap.put(op, pCast);
+        ExpressionOperator from = (ExpressionOperator) logToPhyMap.get(op
+                .getExpression());
+        pCast.setResultType(op.getType());
+        FuncSpec lfSpec = op.getFuncSpec();
+        if(null != lfSpec) {
+            pCast.setLoadFSpec(lfSpec);
+        }
+        try {
+            currentPlan.connect(from, pCast);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/GreaterThanEqualExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/GreaterThanEqualExpression.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/GreaterThanEqualExpression.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/GreaterThanEqualExpression.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.expression;
+
+import java.io.IOException;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+public class GreaterThanEqualExpression extends BinaryExpression {
+
+    /**
+     * Will add this operator to the plan and connect it to the 
+     * left and right hand side operators.
+     * @param plan plan this operator is part of
+     * @param lhs expression on its left hand side
+     * @param rhs expression on its right hand side
+     */
+    public GreaterThanEqualExpression(OperatorPlan plan,
+                           LogicalExpression lhs,
+                           LogicalExpression rhs) {
+        super("GreaterThanEqual", plan, DataType.BOOLEAN, lhs, rhs);
+    }
+
+    /**
+     * @link org.apache.pig.experimental.plan.Operator#accept(org.apache.pig.experimental.plan.PlanVisitor)
+     */
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalExpressionVisitor)) {
+            throw new IOException("Expected LogicalExpressionVisitor");
+        }
+        ((LogicalExpressionVisitor)v).visitGreaterThanEqual(this);
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof GreaterThanEqualExpression) {
+            GreaterThanEqualExpression eo = (GreaterThanEqualExpression)other;
+            try {
+                return eo.getLhs().isEqual(getLhs()) && eo.getRhs().isEqual(getRhs());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            return false;
+        }
+     }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/GreaterThanExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/GreaterThanExpression.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/GreaterThanExpression.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/GreaterThanExpression.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.expression;
+
+import java.io.IOException;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+public class GreaterThanExpression extends BinaryExpression {
+
+    /**
+     * Will add this operator to the plan and connect it to the 
+     * left and right hand side operators.
+     * @param plan plan this operator is part of
+     * @param lhs expression on its left hand side
+     * @param rhs expression on its right hand side
+     */
+    public GreaterThanExpression(OperatorPlan plan,
+                           LogicalExpression lhs,
+                           LogicalExpression rhs) {
+        super("GreaterThan", plan, DataType.BOOLEAN, lhs, rhs);
+    }
+
+    /**
+     * @link org.apache.pig.experimental.plan.Operator#accept(org.apache.pig.experimental.plan.PlanVisitor)
+     */
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalExpressionVisitor)) {
+            throw new IOException("Expected LogicalExpressionVisitor");
+        }
+        ((LogicalExpressionVisitor)v).visitGreaterThan(this);
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof GreaterThanExpression) {
+            GreaterThanExpression eo = (GreaterThanExpression)other;
+            try {
+                return eo.getLhs().isEqual(getLhs()) && eo.getRhs().isEqual(getRhs());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            return false;
+        }
+     }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LessThanEqualExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LessThanEqualExpression.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LessThanEqualExpression.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LessThanEqualExpression.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.expression;
+
+import java.io.IOException;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+public class LessThanEqualExpression extends BinaryExpression {
+
+    /**
+     * Will add this operator to the plan and connect it to the 
+     * left and right hand side operators.
+     * @param plan plan this operator is part of
+     * @param lhs expression on its left hand side
+     * @param rhs expression on its right hand side
+     */
+    public LessThanEqualExpression(OperatorPlan plan,
+                           LogicalExpression lhs,
+                           LogicalExpression rhs) {
+        super("LessThanEqual", plan, DataType.BOOLEAN, lhs, rhs);
+    }
+
+    /**
+     * @link org.apache.pig.experimental.plan.Operator#accept(org.apache.pig.experimental.plan.PlanVisitor)
+     */
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalExpressionVisitor)) {
+            throw new IOException("Expected LogicalExpressionVisitor");
+        }
+        ((LogicalExpressionVisitor)v).visitLessThanEqual(this);
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LessThanEqualExpression) {
+            LessThanEqualExpression eo = (LessThanEqualExpression)other;
+            try {
+                return eo.getLhs().isEqual(getLhs()) && eo.getRhs().isEqual(getRhs());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            return false;
+        }
+     }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LessThanExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LessThanExpression.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LessThanExpression.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LessThanExpression.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.expression;
+
+import java.io.IOException;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+public class LessThanExpression extends BinaryExpression {
+
+    /**
+     * Will add this operator to the plan and connect it to the 
+     * left and right hand side operators.
+     * @param plan plan this operator is part of
+     * @param lhs expression on its left hand side
+     * @param rhs expression on its right hand side
+     */
+    public LessThanExpression(OperatorPlan plan,
+                           LogicalExpression lhs,
+                           LogicalExpression rhs) {
+        super("LessThan", plan, DataType.BOOLEAN, lhs, rhs);
+    }
+
+    /**
+     * @link org.apache.pig.experimental.plan.Operator#accept(org.apache.pig.experimental.plan.PlanVisitor)
+     */
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalExpressionVisitor)) {
+            throw new IOException("Expected LogicalExpressionVisitor");
+        }
+        ((LogicalExpressionVisitor)v).visitLessThan(this);
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LessThanExpression) {
+            LessThanExpression eo = (LessThanExpression)other;
+            try {
+                return eo.getLhs().isEqual(getLhs()) && eo.getRhs().isEqual(getRhs());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            return false;
+        }
+     }
+
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpression.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpression.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpression.java Thu Feb 11 22:12:36 2010
@@ -18,6 +18,9 @@
 
 package org.apache.pig.experimental.logical.expression;
 
+import java.io.IOException;
+
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
 import org.apache.pig.experimental.plan.Operator;
 import org.apache.pig.experimental.plan.OperatorPlan;
 
@@ -28,9 +31,19 @@
  */
 public abstract class LogicalExpression extends Operator {
     
+    static long nextUid = 1;
     protected byte type;
-    protected long uid;
+    protected long uid = -1;
 
+    static public long getNextUid() {
+        return nextUid++;
+    }
+    
+    // used for junit test, should not be called elsewhere
+    static public void resetNextUid() {
+        nextUid = 1;
+    }
+    
     /**
      * 
      * @param name of the operator
@@ -55,15 +68,30 @@
      * @return unique identifier
      */
     public long getUid() {
+        if (uid == -1) {
+            throw new RuntimeException("getUid called before uid set");
+        }
         return uid;
     }
     
     /**
-     * Set the unique identify for this expression
-     * @param uid unique identifier
+     * Set the uid.  For most expressions this will get a new uid.
+     * ProjectExpression needs to override this and find its uid from its
+     * predecessor.
+     * @param currentOp Current LogicalRelationalOperator that this expression operator
+     * is attached to.  Passed so that projection operators can determine their uid.
+     * @throws IOException
      */
-    public void setUid(long uid) {
-       this.uid = uid; 
+    public void setUid(LogicalRelationalOperator currentOp) throws IOException {
+        uid = getNextUid();
+    }
+    
+    /**
+     * Hard code the uid.  This should only be used in testing, never in real
+     * code.
+     * @param uid value to set uid to
+     */
+    public void neverUseForRealSetUid(long uid) {
+        this.uid = uid;
     }
-
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpressionPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpressionPlan.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpressionPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpressionPlan.java Thu Feb 11 22:12:36 2010
@@ -18,11 +18,30 @@
 
 package org.apache.pig.experimental.logical.expression;
 
+import java.util.List;
+
 import org.apache.pig.experimental.plan.BaseOperatorPlan;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
 
 /**
  * A plan containing LogicalExpressionOperators.
  */
 public class LogicalExpressionPlan extends BaseOperatorPlan {
-
+    
+    @Override
+    public boolean isEqual(OperatorPlan other) {
+        if (other != null && other instanceof LogicalExpressionPlan) {
+            LogicalExpressionPlan otherPlan = (LogicalExpressionPlan)other;
+            List<Operator> roots = getSources();
+            List<Operator> otherRoots = otherPlan.getSources();
+            if (roots.size() == 0 && otherRoots.size() == 0) return true;
+            if (roots.size() > 1 || otherRoots.size() > 1) {
+                throw new RuntimeException("Found LogicalExpressionPlan with more than one root.  Unexpected.");
+            }
+            return roots.get(0).isEqual(otherRoots.get(0));            
+        } else {
+            return false;
+        }
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpressionVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpressionVisitor.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpressionVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpressionVisitor.java Thu Feb 11 22:12:36 2010
@@ -18,6 +18,7 @@
 
 package org.apache.pig.experimental.logical.expression;
 
+import java.io.IOException;
 import org.apache.pig.experimental.plan.OperatorPlan;
 import org.apache.pig.experimental.plan.PlanVisitor;
 import org.apache.pig.experimental.plan.PlanWalker;
@@ -38,18 +39,33 @@
         }
     }
     
-    public void visitAnd(AndExpression andExpr) {
+    public void visitAnd(AndExpression andExpr) throws IOException {
+    }
+    
+    public void visitOr(OrExpression exp) throws IOException { 
     }
 
-    public void visitEqual(EqualExpression equal) {
+    public void visitEqual(EqualExpression equal) throws IOException {
     }
     
-    public void visitProject(ProjectExpression project) {
+    public void visitProject(ProjectExpression project) throws IOException {
     }
     
-    public void visitConstant(ConstantExpression constant) {
+    public void visitConstant(ConstantExpression constant) throws IOException {
     }
     
+    public void visitCast(CastExpression cast) throws IOException {
+    }
+
+    public void visitGreaterThan(GreaterThanExpression greaterThanExpression) throws IOException {
+    }
     
+    public void visitGreaterThanEqual(GreaterThanEqualExpression op) throws IOException {
+    }
 
+    public void visitLessThan(LessThanExpression lessThanExpression) throws IOException { 
+    }
+    
+    public void visitLessThanEqual(LessThanEqualExpression op) throws IOException {
+    }
 }

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/OrExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/OrExpression.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/OrExpression.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/OrExpression.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.expression;
+
+import java.io.IOException;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+/**
+ * Boolean OR Expression
+ *
+ */
+public class OrExpression extends BinaryExpression {
+
+    /**
+     * Will add this operator to the plan and connect it to the 
+     * left and right hand side operators.
+     * @param plan plan this operator is part of
+     * @param lhs expression on its left hand side
+     * @param rhs expression on its right hand side
+     */
+    public OrExpression(OperatorPlan plan,
+                         LogicalExpression lhs,
+                         LogicalExpression rhs) {
+        super("Or", plan, DataType.BOOLEAN, lhs, rhs);
+    }
+
+    /**
+     * @link org.apache.pig.experimental.plan.Operator#accept(org.apache.pig.experimental.plan.PlanVisitor)
+     */
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalExpressionVisitor)) {
+            throw new IOException("Expected LogicalExpressionVisitor");
+        }
+        ((LogicalExpressionVisitor)v).visitOr(this);
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof OrExpression) {
+            OrExpression ao = (OrExpression)other;
+            try {
+                return ao.getLhs().isEqual(getLhs()) && ao.getRhs().isEqual(getRhs());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            return false;
+        }
+        
+    }
+
+}