You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2010/02/11 23:12:43 UTC

svn commit: r909165 [2/6] - in /hadoop/pig/trunk: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/logical/ src/org/apache/pig/experimental/logical/expression/ src/org/apache/pig/experimental/logica...

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/ProjectExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/ProjectExpression.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/ProjectExpression.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/ProjectExpression.java Thu Feb 11 22:12:36 2010
@@ -18,6 +18,12 @@
 
 package org.apache.pig.experimental.logical.expression;
 
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.plan.Operator;
 import org.apache.pig.experimental.plan.OperatorPlan;
 import org.apache.pig.experimental.plan.PlanVisitor;
 
@@ -58,9 +64,9 @@
      * @link org.apache.pig.experimental.plan.Operator#accept(org.apache.pig.experimental.plan.PlanVisitor)
      */
     @Override
-    public void accept(PlanVisitor v) {
+    public void accept(PlanVisitor v) throws IOException {
         if (!(v instanceof LogicalExpressionVisitor)) {
-            throw new RuntimeException("Expected LogicalExpressionVisitor");
+            throw new IOException("Expected LogicalExpressionVisitor");
         }
         ((LogicalExpressionVisitor)v).visitProject(this);
 
@@ -104,4 +110,44 @@
         this.type = type;
     }
 
+    @Override
+    public void setUid(LogicalRelationalOperator currentOp) throws IOException {
+        LogicalRelationalOperator referent = findReferent(currentOp);
+        
+        LogicalSchema schema = referent.getSchema();
+        if (schema != null) {
+            uid = schema.getField(col).uid;
+        }
+    }
+    
+    /**
+     * Find the LogicalRelationalOperator that this projection refers to.
+     * @param currentOp Current operator this projection is attached to
+     * @return LRO this projection refers to
+     * @throws IOException
+     */
+    public LogicalRelationalOperator findReferent(LogicalRelationalOperator currentOp) throws IOException {
+        List<Operator> preds;
+        preds = currentOp.getPlan().getPredecessors(currentOp);
+        if (preds == null || preds.size() - 1 < input) {
+            throw new IOException("Projection with nothing to reference!");
+        }
+            
+        LogicalRelationalOperator pred =
+            (LogicalRelationalOperator)preds.get(input);
+        if (pred == null) {
+            throw new IOException("Found bad operator in logical plan");
+        }
+        return pred;
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof ProjectExpression) {
+            ProjectExpression po = (ProjectExpression)other;
+            return po.input == input && po.col == col;
+        } else {
+            return false;
+        }
+    }
 }

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/UnaryExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/UnaryExpression.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/UnaryExpression.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/UnaryExpression.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.expression;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+
+/**
+ * Superclass for all unary expressions
+ *
+ */
+public abstract class UnaryExpression extends LogicalExpression {
+    
+    /**
+     * Will add this operator to the plan and connect it to the 
+     * left and right hand side operators.
+     * @param name of the operator
+     * @param plan plan this operator is part of
+     * @param b Datatype of this expression
+     * @param exp expression that this expression operators on
+     */
+    public UnaryExpression(String name,
+                            OperatorPlan plan,
+                            byte b,
+                            LogicalExpression exp) {
+        super(name, plan, b);
+        plan.add(this);
+        plan.connect(this, exp);        
+    }
+
+    /**
+     * Get the expression that this unary expression operators on.
+     * @return expression on the left hand side
+     * @throws IOException 
+     */
+    public LogicalExpression getExpression() throws IOException {
+        List<Operator> preds = plan.getSuccessors(this);
+        if(preds == null) {
+            return null;
+        }
+        return (LogicalExpression)preds.get(0);
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/AllExpressionVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/AllExpressionVisitor.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/AllExpressionVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/AllExpressionVisitor.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.optimizer;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOForEach;
+import org.apache.pig.experimental.logical.relational.LOGenerate;
+import org.apache.pig.experimental.logical.relational.LOInnerLoad;
+import org.apache.pig.experimental.logical.relational.LOJoin;
+import org.apache.pig.experimental.logical.relational.LogicalPlanVisitor;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanWalker;
+
+/**
+ * A visitor that walks a logical plan and then applies a given
+ * LogicalExpressionVisitor to all expressions it encounters.
+ *
+ */
+public abstract class AllExpressionVisitor extends LogicalPlanVisitor {
+    
+    protected LogicalExpressionVisitor exprVisitor;
+    protected LogicalRelationalOperator currentOp;
+
+    /**
+     * @param plan LogicalPlan to visit
+     * @param walker Walker to use to visit the plan.
+     */
+    public AllExpressionVisitor(OperatorPlan plan,
+                                PlanWalker walker) {
+        super(plan, walker);
+    }
+    
+    /**
+     * Get a new instance of the expression visitor to apply to 
+     * a given expression.
+     * @param expr LogicalExpressionPlan that will be visited
+     * @return a new LogicalExpressionVisitor for that expression
+     */
+    abstract protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr);
+    
+    @Override
+    public void visitLOFilter(LOFilter filter) throws IOException {
+        currentOp = filter;
+        LogicalExpressionVisitor v = getVisitor(filter.getFilterPlan());
+        v.visit();
+    }
+    
+    @Override
+    public void visitLOJoin(LOJoin join) throws IOException {
+        currentOp = join;
+        Collection<LogicalExpressionPlan> c = join.getExpressionPlans();
+        for (LogicalExpressionPlan plan : c) {
+            LogicalExpressionVisitor v = getVisitor(plan);
+            v.visit();
+        }
+    }
+    
+    @Override
+    public void visitLOForEach(LOForEach foreach) throws IOException {
+        currentOp = foreach;
+        // We have an Inner OperatorPlan in ForEach, so we go ahead
+        // and work on that plan
+        OperatorPlan innerPlan = foreach.getInnerPlan();
+        PlanWalker newWalker = currentWalker.spawnChildWalker(innerPlan);
+        pushWalker(newWalker);
+        currentWalker.walk(this);
+        popWalker();
+    }
+    
+    @Override
+    public void visitLOGenerate(LOGenerate gen ) throws IOException {
+        currentOp = gen;
+        Collection<LogicalExpressionPlan> plans = gen.getOutputPlans();
+        for( LogicalExpressionPlan plan : plans ) {
+            LogicalExpressionVisitor v = getVisitor(plan);
+            v.visit();
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/AllSameVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/AllSameVisitor.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/AllSameVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/AllSameVisitor.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.optimizer;
+
+import java.io.IOException;
+
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOJoin;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LOStore;
+import org.apache.pig.experimental.logical.relational.LogicalPlanVisitor;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanWalker;
+
+/**
+ * A visitor that walks the logical plan and calls the same method on every
+ * type of node.  Subclasses can extend this and implement the execute
+ * method, and this method will be called on every node in the graph.
+ *
+ */
+public abstract class AllSameVisitor extends LogicalPlanVisitor {
+
+    /**
+     * @param plan OperatorPlan to visit
+     * @param walker Walker to use to visit the plan
+     */
+    public AllSameVisitor(OperatorPlan plan, PlanWalker walker) {
+        super(plan, walker);
+    }
+    
+    /**
+     * Method to call on every node in the logical plan.
+     * @param op Node that is currently being visited.
+     */
+    abstract protected void execute(LogicalRelationalOperator op) throws IOException;
+    
+    @Override
+    public void visitLOFilter(LOFilter filter) throws IOException {
+        execute(filter);
+    }
+
+    @Override
+    public void visitLOJoin(LOJoin join) throws IOException {
+        execute(join);
+    }
+
+    @Override
+    public void visitLOLoad(LOLoad load) throws IOException {
+        execute(load);
+    }
+    
+    @Override
+    public void visitLOStore(LOStore store) throws IOException {
+        execute(store);
+    }
+    
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/LogicalPlanOptimizer.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/LogicalPlanOptimizer.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/LogicalPlanOptimizer.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.experimental.logical.rules.FilterAboveForeach;
+import org.apache.pig.experimental.logical.rules.MergeFilter;
+import org.apache.pig.experimental.logical.rules.PushUpFilter;
+import org.apache.pig.experimental.logical.rules.SplitFilter;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.optimizer.PlanOptimizer;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+
+public class LogicalPlanOptimizer extends PlanOptimizer {
+
+    public LogicalPlanOptimizer(OperatorPlan p, int iterations) {    	
+        super(p, null, iterations);
+        ruleSets = buildRuleSets();
+        addListeners();
+    }
+
+    protected List<Set<Rule>> buildRuleSets() {
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();	    
+         
+        // Split Set
+        // This set of rules does splitting of operators only.
+        // It does not move operators
+        Set<Rule> s = new HashSet<Rule>();
+        ls.add(s);
+        // add split filter rule
+        Rule r = new SplitFilter("SplitFilter");
+        s.add(r);
+                
+         
+        
+        
+        // Push Set,
+        // This set does moving of operators only.
+        s = new HashSet<Rule>();
+        ls.add(s);
+        // add push up filter rule
+        r = new PushUpFilter("PushUpFilter");
+        s.add(r);
+        r = new FilterAboveForeach("FilterAboveForEachWithFlatten");
+        s.add(r);
+        
+        
+        
+        
+        // Merge Set
+        // This Set merges operators but does not move them.
+        s = new HashSet<Rule>();
+        ls.add(s);
+        // add merge filter rule
+        r = new MergeFilter("MergeFilter");        
+        s.add(r);	    
+
+        
+        return ls;
+    }
+    
+    private void addListeners() {
+        addPlanTransformListener(new SchemaPatcher());
+        addPlanTransformListener(new ProjectionPatcher());
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/ProjectionPatcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/ProjectionPatcher.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/ProjectionPatcher.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/ProjectionPatcher.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.optimizer;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.plan.DepthFirstWalker;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.optimizer.PlanTransformListener;
+
+/**
+ * A PlanTransformListener that will patch up references in projections.
+ *
+ */
+public class ProjectionPatcher implements PlanTransformListener {
+
+    /**
+     * @link org.apache.pig.experimental.plan.optimizer.PlanTransformListener#transformed(org.apache.pig.experimental.plan.OperatorPlan, org.apache.pig.experimental.plan.OperatorPlan)
+     */
+    @Override
+    public void transformed(OperatorPlan fp, OperatorPlan tp)
+            throws IOException {
+        ProjectionFinder pf = new ProjectionFinder(tp);
+        pf.visit();
+    }
+    
+    private static class ProjectionRewriter extends LogicalExpressionVisitor {
+
+        private LogicalRelationalOperator currentOp;
+        
+        ProjectionRewriter(OperatorPlan p, LogicalRelationalOperator cop) {
+            super(p, new DepthFirstWalker(p));
+            currentOp = cop;
+        }
+        
+        @Override
+        public void visitProject(ProjectExpression p) throws IOException {
+            // Get the uid for this projection.  It must match the uid of the 
+            // value it is projecting.
+            long myUid = p.getUid();
+            
+            // Find the operator this projection references
+            LogicalRelationalOperator pred = p.findReferent(currentOp);
+            
+            // Get the schema for this operator and search it for the matching uid
+            int match = -1;
+            LogicalSchema schema = pred.getSchema();
+            List<LogicalSchema.LogicalFieldSchema> fields = schema.getFields();
+            for (int i = 0; i < fields.size(); i++) {
+                if (fields.get(i).uid == myUid) {
+                    match = i;
+                    break;
+                }
+            }
+            if (match == -1) {
+                throw new IOException("Couldn't find matching uid for project");
+            }
+            p.setColNum(match);
+        }
+        
+    }
+    
+    private static class ProjectionFinder extends AllExpressionVisitor {
+
+        public ProjectionFinder(OperatorPlan plan) {
+            super(plan, new DepthFirstWalker(plan));
+        }
+
+        @Override
+        protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr) {
+            return new ProjectionRewriter(expr, currentOp);
+        }
+        
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/SchemaPatcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/SchemaPatcher.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/SchemaPatcher.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/SchemaPatcher.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.optimizer;
+
+import java.io.IOException;
+
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.plan.DependencyOrderWalker;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.optimizer.PlanTransformListener;
+
+/**
+ * A PlanTransformListener for the logical optimizer that will patch up schemas
+ * after a plan has been transformed.
+ *
+ */
+public class SchemaPatcher implements PlanTransformListener {
+
+    /**
+     * @throws IOException 
+     * @link org.apache.pig.experimental.plan.optimizer.PlanTransformListener#transformed(org.apache.pig.experimental.plan.OperatorPlan, org.apache.pig.experimental.plan.OperatorPlan)
+     */
+    @Override
+    public void transformed(OperatorPlan fp, OperatorPlan tp) throws IOException {
+        // Walk the transformed plan and clean out the schemas and call
+        // getSchema again on each node.  This will cause each node
+        // to regenerate its schema from its parent.
+        
+        SchemaVisitor sv = new SchemaVisitor(tp);
+        sv.visit();
+    }
+    
+    private static class SchemaVisitor extends AllSameVisitor {
+
+        public SchemaVisitor(OperatorPlan plan) {
+            super(plan, new DependencyOrderWalker(plan));
+        }
+
+        @Override
+        protected void execute(LogicalRelationalOperator op) throws IOException {
+            op.resetSchema();
+            op.getSchema();
+        }
+        
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/UidStamper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/UidStamper.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/UidStamper.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/UidStamper.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.optimizer;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.experimental.logical.expression.AndExpression;
+import org.apache.pig.experimental.logical.expression.CastExpression;
+import org.apache.pig.experimental.logical.expression.ConstantExpression;
+import org.apache.pig.experimental.logical.expression.EqualExpression;
+import org.apache.pig.experimental.logical.expression.GreaterThanEqualExpression;
+import org.apache.pig.experimental.logical.expression.GreaterThanExpression;
+import org.apache.pig.experimental.logical.expression.LessThanEqualExpression;
+import org.apache.pig.experimental.logical.expression.LessThanExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.experimental.logical.expression.OrExpression;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.experimental.plan.DependencyOrderWalker;
+import org.apache.pig.experimental.plan.DepthFirstWalker;
+import org.apache.pig.experimental.plan.OperatorPlan;
+
+/**
+ * A Visitor to stamp every part of every expression in a tree with a uid.
+ */
+public class UidStamper extends AllExpressionVisitor {
+
+    /**
+     * @param plan LogicalPlan that this stamper will act on.
+     */
+    public UidStamper(OperatorPlan plan) {
+        super(plan, new DependencyOrderWalker(plan));
+    }
+    
+    class ExprUidStamper extends LogicalExpressionVisitor {
+        protected ExprUidStamper(OperatorPlan plan) {
+            super(plan, new DepthFirstWalker(plan));
+        }
+        
+    
+        @Override
+        public void visitAnd(AndExpression andExpr) throws IOException {
+            andExpr.setUid(currentOp);
+        }
+        
+        @Override
+        public void visitOr(OrExpression op) throws IOException {
+            op.setUid(currentOp);
+        }
+
+        @Override
+        public void visitEqual(EqualExpression equal) throws IOException {
+            equal.setUid(currentOp);
+        }
+        
+        @Override
+        public void visitGreaterThan(GreaterThanExpression greaterThanExpression) throws IOException {
+            greaterThanExpression.setUid(currentOp);
+        }
+        
+        @Override
+        public void visitGreaterThanEqual(GreaterThanEqualExpression op) throws IOException {
+            op.setUid(currentOp);
+        }
+        
+        @Override
+        public void visitLessThan(LessThanExpression lessThanExpression) throws IOException {
+            lessThanExpression.setUid(currentOp);
+        }
+        
+        @Override
+        public void visitLessThanEqual(LessThanEqualExpression op) throws IOException {
+            op.setUid(currentOp);
+        }
+    
+        @Override
+        public void visitProject(ProjectExpression project) throws IOException {
+            project.setUid(currentOp);
+        }
+    
+        @Override
+        public void visitConstant(ConstantExpression constant) throws IOException {
+            constant.setUid(currentOp);
+        }
+        
+        @Override
+        public void visitCast(CastExpression cast) throws IOException {
+            cast.setUid(currentOp);
+        }
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.pig.experimental.logical.optimizer.AllExpressionVisitor#getVisitor(org.apache.pig.experimental.logical.expression.LogicalExpressionPlan)
+     */
+    @Override
+    protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr) {
+        return new ExprUidStamper(expr);
+    }
+
+    @Override
+    public void visitLOLoad(LOLoad load) throws IOException {
+        super.visitLOLoad(load);
+        
+        LogicalSchema s = load.getSchema();
+        stampSchema(s);
+    }
+    
+    private void stampSchema(LogicalSchema s) {
+        if (s != null) {
+            List<LogicalFieldSchema> l = s.getFields();
+            for(LogicalFieldSchema f: l) {
+                f.uid = LogicalExpression.getNextUid();
+                stampSchema(f.schema);
+            }
+        }
+    }      
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOFilter.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOFilter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOFilter.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.relational;
+
+import java.io.IOException;
+
+//import org.apache.commons.logging.Log;
+//import org.apache.commons.logging.LogFactory;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+public class LOFilter extends LogicalRelationalOperator {
+
+    private static final long serialVersionUID = 2L;
+    private LogicalExpressionPlan filterPlan;
+    //private static Log log = LogFactory.getLog(LOFilter.class);
+
+        
+    public LOFilter(LogicalPlan plan) {
+        super("LOFilter", plan);       
+    }
+
+    public LOFilter(LogicalPlan plan, LogicalExpressionPlan filterPlan) {
+        super("LOFilter", plan);
+        this.filterPlan = filterPlan;
+    }
+    
+    public LogicalExpressionPlan getFilterPlan() {
+        return filterPlan;
+    }
+    
+    public void setFilterPlan(LogicalExpressionPlan filterPlan) {
+        this.filterPlan = filterPlan;
+    }
+    
+    @Override
+    public LogicalSchema getSchema() {        
+        LogicalRelationalOperator input = null;
+        try {
+            input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
+        }catch(Exception e) {
+            throw new RuntimeException("Unable to get predecessor of LOFilter.", e);
+        }
+        
+        schema = input.getSchema();        
+        return schema;
+    }   
+    
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalPlanVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalPlanVisitor)v).visitLOFilter(this);
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOFilter) { 
+            LOFilter of = (LOFilter)other;
+            return filterPlan.isEqual(of.filterPlan) && checkEquality(of);
+        } else {
+            return false;
+        }
+    }
+}
+

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOForEach.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOForEach.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOForEach.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.relational;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+public class LOForEach extends LogicalRelationalOperator {
+
+    private static final long serialVersionUID = 2L;
+
+    private LogicalPlan innerPlan;
+      
+    public LOForEach(OperatorPlan plan) {
+        super("LOForEach", plan);		
+    }
+
+    public LogicalPlan getInnerPlan() {
+        return innerPlan;
+    }
+    
+    public void setInnerPlan(LogicalPlan p) {
+        innerPlan = p;
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (!(other instanceof LOForEach)) {
+            return false;
+        }
+        
+        return innerPlan.isEqual(((LOForEach)other).innerPlan);
+    }
+       
+    @Override
+    public LogicalSchema getSchema() {
+        if (schema != null) {
+            return schema;
+        }
+        
+        List<Operator> ll = innerPlan.getSinks();
+        if (ll != null) {
+            schema = ((LogicalRelationalOperator)ll.get(0)).getSchema();
+        }
+        
+        return schema;
+    }
+
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalPlanVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalPlanVisitor)v).visitLOForEach(this);
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.relational;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+public class LOGenerate extends LogicalRelationalOperator {
+     private List<LogicalExpressionPlan> outputPlans;
+     private boolean[] flattenFlags;
+
+    public LOGenerate(OperatorPlan plan, List<LogicalExpressionPlan> ps, boolean[] flatten) {
+        super("LOGenerate", plan);
+        outputPlans = ps;
+        flattenFlags = flatten;
+    }
+
+    @Override
+    public LogicalSchema getSchema() {
+        if (schema != null) {
+            return schema;
+        }
+        
+        schema = new LogicalSchema();
+        
+        for(int i=0; i<outputPlans.size(); i++) {
+            LogicalExpression exp = (LogicalExpression)outputPlans.get(i).getSinks().get(0);
+            byte t = exp.getType();
+            LogicalSchema fieldSchema = null;
+            String alias = null;
+            
+            // if type is primitive, just add to schema
+            if (t != DataType.TUPLE && t != DataType.BAG) {
+                LogicalFieldSchema f = new LogicalSchema.LogicalFieldSchema(alias, fieldSchema, t, exp.getUid());                
+                schema.addField(f);
+                continue;
+            }
+                       
+            // for tuple and bag type, if there is projection, calculate schema of this field
+            if (exp instanceof ProjectExpression) {
+                LogicalRelationalOperator op = null;
+                try{
+                    op = ((ProjectExpression)exp).findReferent(this);
+                }catch(Exception e) {
+                    throw new RuntimeException(e);
+                }
+                LogicalSchema s = op.getSchema();
+                if (s != null) {
+                    fieldSchema = s.getField(((ProjectExpression)exp).getColNum()).schema;
+                    alias = s.getField(((ProjectExpression)exp).getColNum()).alias;
+                }
+            }
+                
+            // if flatten is set, set schema of tuple field to this schema
+            if (flattenFlags[i]) {
+                if (t == DataType.BAG) {
+                    // if it is bag of tuples, get the schema of tuples
+                    if (fieldSchema != null && fieldSchema.size() == 1 
+                        && fieldSchema.getField(0).type == DataType.TUPLE) {
+                        
+                        fieldSchema = fieldSchema.getField(0).schema;
+                    }else {
+                        fieldSchema = null;
+                    }
+                }
+                
+                if (fieldSchema != null) {
+                    List<LogicalFieldSchema> ll = fieldSchema.getFields();
+                    for(LogicalFieldSchema f: ll) {
+                        schema.addField(f);
+                    }                               
+                } else {
+                    schema = null;
+                    break;
+                }
+            } else {
+                 LogicalFieldSchema f = new LogicalSchema.LogicalFieldSchema(alias, fieldSchema, t, exp.getUid());                 
+                 schema.addField(f);  
+            }                                                      
+        }
+        
+        return schema;
+    }
+
+    public List<LogicalExpressionPlan> getOutputPlans() {
+        return outputPlans;
+    }
+    
+    public boolean[] getFlattenFlags() {
+        return flattenFlags;
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (!(other instanceof LOGenerate)) {
+            return false;
+        }
+        
+        List<LogicalExpressionPlan> otherPlan = ((LOGenerate)other).getOutputPlans();
+        boolean[] fs = ((LOGenerate)other).getFlattenFlags();
+        
+        if (outputPlans.size() != otherPlan.size()) {
+            return false;
+        }
+        
+        for(int i=0; i<outputPlans.size(); i++) {
+            if (flattenFlags[i] != fs[i]) {
+                return false;
+            }
+            
+            if (!outputPlans.get(i).isEqual(otherPlan.get(i))) {
+                return false;
+            }
+        }
+        
+        return true;
+    }
+  
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+         if (!(v instanceof LogicalPlanVisitor)) {
+                throw new IOException("Expected LogicalPlanVisitor");
+            }
+            ((LogicalPlanVisitor)v).visitLOGenerate(this);
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.relational;
+
+import java.io.IOException;
+
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+/**
+ * Operator to map the data into the inner plan of LOForEach
+ * It can only be used in the inner plan of LOForEach
+ *
+ */
+public class LOInnerLoad extends LogicalRelationalOperator {
+    private int colNum;
+    private LOForEach foreach;
+
+    public LOInnerLoad(OperatorPlan plan, LOForEach foreach, int colNum) {
+        super("LOInnerLoad", plan);
+        this.colNum = colNum;
+        this.foreach = foreach;
+    }
+
+    @Override
+    public LogicalSchema getSchema() {
+        if (schema != null) {
+            return schema;
+        }
+        
+        LogicalPlan p = (LogicalPlan)foreach.getPlan();
+        try {
+            LogicalRelationalOperator op = (LogicalRelationalOperator)p.getPredecessors(foreach).get(0);
+            if (op.getSchema() != null) {
+                schema = new LogicalSchema();                
+                schema.addField(op.getSchema().getField(colNum));
+            }
+        }catch(Exception e) {
+            throw new RuntimeException(e);
+        }
+        
+        return schema;
+    }
+
+    @Override
+    public boolean isEqual(Operator other) {
+        if (!(other instanceof LOInnerLoad)) {
+            return false;
+        }
+        
+        return (colNum == ((LOInnerLoad)other).colNum);
+    }    
+    
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+         if (!(v instanceof LogicalPlanVisitor)) {
+                throw new IOException("Expected LogicalPlanVisitor");
+            }
+            ((LogicalPlanVisitor)v).visitLOInnerLoad(this);
+    }
+
+    public int getColNum() {
+        return colNum;
+    }
+    
+    /**
+     * Get the LOForEach operator that contains this operator as part of inner plan
+     * @return the LOForEach operator
+     */
+    public LOForEach getLOForEach() {
+        return foreach;
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOJoin.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOJoin.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOJoin.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.relational;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+//import org.apache.commons.logging.Log;
+//import org.apache.commons.logging.LogFactory;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.PlanVisitor;
+import org.apache.pig.impl.util.MultiMap;
+
+
+public class LOJoin extends LogicalRelationalOperator {
+    private static final long serialVersionUID = 2L;
+
+    /**
+     * Enum for the type of join
+     */
+    public static enum JOINTYPE {
+        HASH,    // Hash Join
+        REPLICATED, // Fragment Replicated join
+        SKEWED, // Skewed Join
+        MERGE   // Sort Merge Join
+    };
+
+    
+    /**
+     * LOJoin contains a list of logical operators corresponding to the
+     * relational operators and a list of generates for each relational
+     * operator. Each generate operator in turn contains a list of expressions
+     * for the columns that are projected
+     */
+    //private static Log log = LogFactory.getLog(LOJoin.class);
+    // expression plans for each input. 
+    private MultiMap<Integer, LogicalExpressionPlan> mJoinPlans;
+    // indicator for each input whether it is inner
+    private boolean[] mInnerFlags;
+    private JOINTYPE mJoinType; // Retains the type of the join
+    
+    public LOJoin(LogicalPlan plan) {
+        super("LOJoin", plan);     
+    }
+    
+    public LOJoin(LogicalPlan plan,
+                MultiMap<Integer, LogicalExpressionPlan> joinPlans,
+                JOINTYPE jt,
+                boolean[] isInner) {
+        super("LOJoin", plan);
+        mJoinPlans = joinPlans;
+        mJoinType = jt;
+        mInnerFlags = isInner;
+    }
+
+    public boolean isInner(int inputIndex) {
+        return mInnerFlags[inputIndex];
+    }
+    
+    public boolean[] getInnerFlags() {
+        return mInnerFlags;
+    }
+    
+    public JOINTYPE getJoinType() {
+        return mJoinType;
+    }
+    
+    public Collection<LogicalExpressionPlan> getJoinPlan(int inputIndex) {
+        return mJoinPlans.get(inputIndex);
+    }
+    
+    /**
+     * Get all of the expressions plans that are in this join.
+     * @return collection of all expression plans.
+     */
+    public Collection<LogicalExpressionPlan> getExpressionPlans() {
+        return mJoinPlans.values();
+    }
+    
+    @Override
+    public LogicalSchema getSchema() {
+        // if schema is calculated before, just return
+        if (schema != null) {
+            return schema;
+        }
+        
+        List<Operator> inputs = null;
+        try {
+            inputs = plan.getPredecessors(this);
+            if (inputs == null) {
+                return null;
+            }
+        }catch(Exception e) {
+            throw new RuntimeException("Unable to get predecessors of LOJoin operator. ", e);
+        }
+        
+        List<LogicalSchema.LogicalFieldSchema> fss = new ArrayList<LogicalSchema.LogicalFieldSchema>();
+        
+        for (Operator op : inputs) {
+            LogicalSchema inputSchema = ((LogicalRelationalOperator)op).getSchema();
+            // the schema of one input is unknown, so the join schema is unknown, just return 
+            if (inputSchema == null) {
+                schema = null;
+                return schema;
+            }
+                               
+            for (int i=0; i<inputSchema.size(); i++) {
+                 LogicalSchema.LogicalFieldSchema fs = inputSchema.getField(i);
+                 LogicalSchema.LogicalFieldSchema newFS = null;
+                 if(fs.alias != null) {                    
+                     newFS = new LogicalSchema.LogicalFieldSchema(((LogicalRelationalOperator)op).getAlias()+"::"+fs.alias ,fs.schema, fs.type, fs.uid);                    
+                 } else {
+                     newFS = new LogicalSchema.LogicalFieldSchema(fs.alias, fs.schema, fs.type, fs.uid);
+                 }                       
+                 fss.add(newFS);                 
+            }            
+        }        
+
+        schema = new LogicalSchema();
+        for(LogicalSchema.LogicalFieldSchema fieldSchema: fss) {
+            schema.addField(fieldSchema);
+        }         
+        
+        return schema;
+    }
+    
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalPlanVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalPlanVisitor)v).visitLOJoin(this);
+
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOJoin) {
+            LOJoin oj = (LOJoin)other;
+            if (mJoinType != oj.mJoinType) return false;
+            if (mInnerFlags.length != oj.mInnerFlags.length) return false;
+            for (int i = 0; i < mInnerFlags.length; i++) {
+                if (mInnerFlags[i] != oj.mInnerFlags[i]) return false;
+            }
+            if (!checkEquality(oj)) return false;
+            
+            if (mJoinPlans.size() != oj.mJoinPlans.size())  return false;
+            
+            // Now, we need to make sure that for each input we are projecting
+            // the same columns.  This is slightly complicated since MultiMap
+            // doesn't return any particular order, so we have to find the 
+            // matching input in each case.
+            for (Integer p : mJoinPlans.keySet()) {
+                Iterator<Integer> iter = oj.mJoinPlans.keySet().iterator();
+                int op = -1;
+                while (iter.hasNext()) {
+                    op = iter.next();
+                    if (p.equals(op)) break;
+                }
+                if (op != -1) {
+                    Collection<LogicalExpressionPlan> c = mJoinPlans.get(p);
+                    Collection<LogicalExpressionPlan> oc = oj.mJoinPlans.get(op);
+                    if (c.size() != oc.size()) return false;
+                    
+                    if (!(c instanceof List) || !(oc instanceof List)) {
+                        throw new RuntimeException(
+                            "Expected list of expression plans");
+                    }
+                    List<LogicalExpressionPlan> elist = (List<LogicalExpressionPlan>)c;
+                    List<LogicalExpressionPlan> oelist = (List<LogicalExpressionPlan>)oc;
+                    for (int i = 0; i < elist.size(); i++) {
+                        if (!elist.get(i).isEqual(oelist.get(i))) return false;
+                    }
+                } else {
+                    return false;
+                }
+            }
+            return true;
+        } else {
+            return false;
+        }
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java Thu Feb 11 22:12:36 2010
@@ -18,12 +18,16 @@
 
 package org.apache.pig.experimental.logical.relational;
 
-import org.apache.pig.FuncSpec;
+import java.io.IOException;
+
+import org.apache.pig.experimental.plan.Operator;
 import org.apache.pig.experimental.plan.PlanVisitor;
+import org.apache.pig.impl.io.FileSpec;
 
 public class LOLoad extends LogicalRelationalOperator {
     
     private LogicalSchema scriptSchema;
+    private FileSpec fs;
 
     /**
      * 
@@ -32,9 +36,10 @@
      * specified.
      * @param plan logical plan this load is part of.
      */
-    public LOLoad(FuncSpec loader, LogicalSchema schema, LogicalPlan plan) {
+    public LOLoad(FileSpec loader, LogicalSchema schema, LogicalPlan plan) {
        super("LOLoad", plan);
        scriptSchema = schema;
+       fs = loader;
     }
     
     /**
@@ -65,13 +70,35 @@
         return null;
     }
 
+    public FileSpec getFileSpec() {
+        return fs;
+    }
+    
     @Override
-    public void accept(PlanVisitor v) {
+    public void accept(PlanVisitor v) throws IOException {
         if (!(v instanceof LogicalPlanVisitor)) {
-            throw new RuntimeException("Expected LogicalPlanVisitor");
+            throw new IOException("Expected LogicalPlanVisitor");
         }
         ((LogicalPlanVisitor)v).visitLOLoad(this);
 
     }
-
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOLoad) {
+            LOLoad ol = (LOLoad)other;
+            if (!checkEquality(ol)) return false;
+            if (fs == null) {
+                if (ol.fs == null) {
+                    return true;
+                }else{
+                    return false;
+                }
+            }
+            
+            return fs.equals(ol.fs);
+        } else {
+            return false;
+        }
+    }
 }

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOStore.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOStore.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOStore.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.relational;
+
+import java.io.IOException;
+
+//import org.apache.commons.logging.Log;
+//import org.apache.commons.logging.LogFactory;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.PlanVisitor;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+
+public class LOStore extends LogicalRelationalOperator {
+    private static final long serialVersionUID = 2L;
+
+    private FileSpec output;  
+    transient private StoreFunc storeFunc;
+    
+    //private static Log log = LogFactory.getLog(LOStore.class);
+    
+    public LOStore(LogicalPlan plan) {
+        super("LOStore", plan);
+    }
+    
+    public LOStore(LogicalPlan plan, FileSpec outputFileSpec) {
+        super("LOStore", plan);
+
+        output = outputFileSpec;
+      
+        try { 
+             storeFunc = (StoreFunc) PigContext.instantiateFuncFromSpec(outputFileSpec.getFuncSpec()); 
+        } catch (Exception e) { 
+            throw new RuntimeException("Failed to instantiate StoreFunc.", e);
+        }
+    }
+    
+    public FileSpec getOutputSpec() {
+        return output;
+    }
+    
+    public StoreFunc getStoreFunc() {
+        return storeFunc;
+    }
+    
+    @Override
+    public LogicalSchema getSchema() {
+        try {
+            return ((LogicalRelationalOperator)plan.getPredecessors(this).get(0)).getSchema();
+        }catch(Exception e) {
+            throw new RuntimeException("Unable to get predecessor of LOStore.", e);
+        }
+    }
+
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalPlanVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalPlanVisitor)v).visitLOStore(this);
+    }
+
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOStore) {
+            LOStore os = (LOStore)other;
+            if (!checkEquality(os)) return false;
+            // No need to test that storeFunc is equal, since it's
+            // being instantiated from output
+            if (output == null && os.output == null) {
+                return true;
+            } else if (output == null || os.output == null) {
+                return false;
+            } else {
+                return output.equals(os.output);
+            }
+        } else {
+            return false;
+        }
+    }
+}