You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2010/03/05 22:55:20 UTC

svn commit: r919634 [2/3] - in /hadoop/pig/trunk: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/logical/ src/org/apache/pig/experimental/logical/expression/ src/org/apache/pig/experimental/logica...

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/PlanPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/PlanPrinter.java?rev=919634&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/PlanPrinter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/PlanPrinter.java Fri Mar  5 21:55:19 2010
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.optimizer;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.pig.experimental.logical.expression.AddExpression;
+import org.apache.pig.experimental.logical.expression.AndExpression;
+import org.apache.pig.experimental.logical.expression.CastExpression;
+import org.apache.pig.experimental.logical.expression.ConstantExpression;
+import org.apache.pig.experimental.logical.expression.DivideExpression;
+import org.apache.pig.experimental.logical.expression.EqualExpression;
+import org.apache.pig.experimental.logical.expression.GreaterThanEqualExpression;
+import org.apache.pig.experimental.logical.expression.GreaterThanExpression;
+import org.apache.pig.experimental.logical.expression.IsNullExpression;
+import org.apache.pig.experimental.logical.expression.LessThanEqualExpression;
+import org.apache.pig.experimental.logical.expression.LessThanExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.experimental.logical.expression.MapLookupExpression;
+import org.apache.pig.experimental.logical.expression.ModExpression;
+import org.apache.pig.experimental.logical.expression.MultiplyExpression;
+import org.apache.pig.experimental.logical.expression.NegativeExpression;
+import org.apache.pig.experimental.logical.expression.NotEqualExpression;
+import org.apache.pig.experimental.logical.expression.NotExpression;
+import org.apache.pig.experimental.logical.expression.OrExpression;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.expression.SubtractExpression;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOForEach;
+import org.apache.pig.experimental.logical.relational.LOGenerate;
+import org.apache.pig.experimental.logical.relational.LOInnerLoad;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LOStore;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.plan.DepthFirstWalker;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+import org.apache.pig.experimental.plan.PlanWalker;
+import org.apache.pig.experimental.plan.ReverseDependencyOrderWalker;
+import org.apache.pig.impl.plan.optimizer.Rule.WalkerAlgo;
+
+public class PlanPrinter extends AllExpressionVisitor {
+
+    protected PrintStream stream = null;
+    protected int level = 0;
+    
+//    private String TAB1 = "    ";
+//    private String TABMore = "|   ";
+//    private String LSep = "|\n|---";
+//    private String USep = "|   |\n|   ";
+//    private int levelCntr = -1;
+    
+    public class DepthFirstMemoryWalker extends DepthFirstWalker {
+        
+        private int level = 0;
+        private int startingLevel = 0;
+        private Stack<String> prefixStack;
+        private String currentPrefix = "";
+        
+        public DepthFirstMemoryWalker(OperatorPlan plan, int startingLevel) {
+            super(plan);
+            level = startingLevel;
+            this.startingLevel = startingLevel;
+            prefixStack = new Stack<String>();
+        }
+
+        @Override
+        public PlanWalker spawnChildWalker(OperatorPlan plan) {
+            return new DepthFirstMemoryWalker(plan, level);
+        }
+
+        /**
+         * Begin traversing the graph.
+         * @param visitor Visitor this walker is being used by.
+         * @throws IOException if an error is encountered while walking.
+         */
+        @Override
+        public void walk(PlanVisitor visitor) throws IOException {
+            List<Operator> roots = plan.getSources();
+            Set<Operator> seen = new HashSet<Operator>();
+
+            depthFirst(null, roots, seen, visitor);
+        }
+        
+        public String getPrefix() {
+            return currentPrefix;
+        }
+
+        private void depthFirst(Operator node,
+                                Collection<Operator> successors,
+                                Set<Operator> seen,
+                                PlanVisitor visitor) throws IOException {
+            if (successors == null) return;
+            
+            StringBuilder strb = new StringBuilder(); 
+            for(int i = 0; i < startingLevel; i++ ) {
+                strb.append("|\t");
+            }
+            if( ((level-1) - startingLevel ) >= 0 )
+                strb.append("\t");
+            for(int i = 0; i < ((level-1) - startingLevel ); i++ ) {
+                strb.append("|\t");
+            }
+            strb.append( "|\n" );
+            for(int i = 0; i < startingLevel; i++ ) {
+                strb.append("|\t");
+            }
+            if( ((level-1) - startingLevel ) >= 0 )
+                strb.append("\t");
+            for(int i = 0; i < ((level-1) - startingLevel ); i++ ) {
+                strb.append("|\t");
+            }
+            strb.append("|---");
+            currentPrefix = strb.toString();
+
+            for (Operator suc : successors) {
+                if (seen.add(suc)) {
+                    suc.accept(visitor);
+                    Collection<Operator> newSuccessors = plan.getSuccessors(suc);
+                    level++;
+                    prefixStack.push(currentPrefix);
+                    depthFirst(suc, newSuccessors, seen, visitor);
+                    level--;
+                    currentPrefix = prefixStack.pop();
+                }
+            }
+        }
+    }
+    
+    public PlanPrinter(OperatorPlan plan, PrintStream ps) {
+        super(plan, new ReverseDependencyOrderWalker(plan));
+        stream = ps;
+    }
+
+    @Override
+    protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr) {
+        return new ExprPrinter(expr, level+1);
+    }
+
+    class ExprPrinter extends LogicalExpressionVisitor {
+
+        protected ExprPrinter(OperatorPlan plan, int startingLevel) {
+            super(plan, new DepthFirstMemoryWalker(plan, startingLevel));
+        }
+        
+        private void simplevisit(LogicalExpression exp) {
+            stream.print( ((DepthFirstMemoryWalker)currentWalker).getPrefix() );
+            stream.println( exp.toString() );
+        }
+
+        @Override
+        public void visitAnd(AndExpression exp) throws IOException {
+            simplevisit(exp);
+        }
+
+        @Override
+        public void visitOr(OrExpression exp) throws IOException {
+            simplevisit(exp);
+        }
+
+        @Override
+        public void visitEqual(EqualExpression exp) throws IOException {
+            simplevisit(exp);
+        }
+
+        @Override
+        public void visitProject(ProjectExpression exp) throws IOException {
+            simplevisit(exp);
+        }
+        
+        @Override
+        public void visitMapLookup(MapLookupExpression exp) throws IOException {
+            simplevisit(exp);
+        }
+
+        @Override
+        public void visitConstant(ConstantExpression exp) throws IOException {
+            simplevisit(exp);
+        }
+
+        @Override
+        public void visitCast(CastExpression exp) throws IOException {
+            simplevisit(exp);
+        }
+
+        @Override
+        public void visitGreaterThan(GreaterThanExpression exp) throws IOException {
+            simplevisit(exp);
+        }
+
+        @Override
+        public void visitGreaterThanEqual(GreaterThanEqualExpression exp) throws IOException {
+            simplevisit(exp);
+        }
+
+        @Override
+        public void visitLessThan(LessThanExpression exp) throws IOException {
+            simplevisit(exp);
+        }
+
+        @Override
+        public void visitLessThanEqual(LessThanEqualExpression exp) throws IOException {
+            simplevisit(exp);
+        }
+        
+        @Override
+        public void visitNotEqual(NotEqualExpression exp) throws IOException { 
+            simplevisit(exp);
+        }
+
+        @Override
+        public void visitNot(NotExpression exp ) throws IOException {
+            simplevisit(exp);
+        }
+
+        @Override
+        public void visitIsNull(IsNullExpression exp) throws IOException {
+            simplevisit(exp);
+        }
+        
+        @Override
+        public void visitNegative(NegativeExpression exp) throws IOException {
+            simplevisit(exp);
+        }
+        
+        @Override
+        public void visitAdd(AddExpression exp) throws IOException {
+            simplevisit(exp);
+        }
+        
+        @Override
+        public void visitSubtract(SubtractExpression exp) throws IOException {
+            simplevisit(exp);
+        }
+        
+        @Override
+        public void visitMultiply(MultiplyExpression exp) throws IOException {
+            simplevisit(exp);
+        }
+        
+        @Override
+        public void visitMod(ModExpression exp) throws IOException {
+            simplevisit(exp);
+        }
+        
+        @Override
+        public void visitDivide(DivideExpression exp) throws IOException {
+            simplevisit(exp);
+        }
+    }
+
+    @Override
+    public void visitLOLoad(LOLoad op) throws IOException {
+        printLevel();
+        stream.println( op.toString() );
+    }
+
+    @Override
+    public void visitLOStore(LOStore op) throws IOException {
+        printLevel();
+        stream.println( op.toString() );
+    }
+
+    @Override
+    public void visitLOForEach(LOForEach op) throws IOException {
+        printLevel();
+        stream.println( op.toString() );
+        level++;
+        OperatorPlan innerPlan = op.getInnerPlan();
+        PlanWalker newWalker = currentWalker.spawnChildWalker(innerPlan);
+        pushWalker(newWalker);
+        currentWalker.walk(this);
+        popWalker();
+        level--;
+    }
+
+    @Override
+    public void visitLOFilter(LOFilter op) throws IOException {
+        printLevel();
+        stream.println( op.toString() );
+        LogicalExpressionVisitor v = getVisitor(op.getFilterPlan());
+        level++;
+        v.visit();
+        level--;
+    }
+
+    @Override
+    public void visitLOGenerate(LOGenerate op) throws IOException {
+        printLevel();        
+        stream.println( op.toString() );
+        List<LogicalExpressionPlan> plans = op.getOutputPlans();
+        LogicalExpressionVisitor v = null;
+        level++;
+        for( LogicalExpressionPlan plan : plans ) {
+            v = getVisitor(plan);
+            v.visit();
+        }
+        level--;
+    }
+
+    @Override
+    public void visitLOInnerLoad(LOInnerLoad op) throws IOException {
+        printLevel();
+        stream.println( op.toString() );
+    }
+
+    public String toString() {
+        return stream.toString();
+    }   
+    
+    private void printLevel() {
+        for(int i =0; i < level; i++ ) {
+            stream.print("|\t");
+        }
+        stream.println("|");
+        for(int i =0; i < level; i++ ) {
+            stream.print("|\t");
+        }
+        stream.print("|---");
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/UidStamper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/UidStamper.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/UidStamper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/UidStamper.java Fri Mar  5 21:55:19 2010
@@ -21,19 +21,29 @@
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.pig.experimental.logical.expression.AddExpression;
 import org.apache.pig.experimental.logical.expression.AndExpression;
 import org.apache.pig.experimental.logical.expression.CastExpression;
 import org.apache.pig.experimental.logical.expression.ConstantExpression;
+import org.apache.pig.experimental.logical.expression.DivideExpression;
 import org.apache.pig.experimental.logical.expression.EqualExpression;
 import org.apache.pig.experimental.logical.expression.GreaterThanEqualExpression;
 import org.apache.pig.experimental.logical.expression.GreaterThanExpression;
+import org.apache.pig.experimental.logical.expression.IsNullExpression;
 import org.apache.pig.experimental.logical.expression.LessThanEqualExpression;
 import org.apache.pig.experimental.logical.expression.LessThanExpression;
 import org.apache.pig.experimental.logical.expression.LogicalExpression;
 import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.experimental.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.experimental.logical.expression.MapLookupExpression;
+import org.apache.pig.experimental.logical.expression.ModExpression;
+import org.apache.pig.experimental.logical.expression.MultiplyExpression;
+import org.apache.pig.experimental.logical.expression.NegativeExpression;
+import org.apache.pig.experimental.logical.expression.NotEqualExpression;
+import org.apache.pig.experimental.logical.expression.NotExpression;
 import org.apache.pig.experimental.logical.expression.OrExpression;
 import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.expression.SubtractExpression;
 import org.apache.pig.experimental.logical.relational.LOLoad;
 import org.apache.pig.experimental.logical.relational.LogicalSchema;
 import org.apache.pig.experimental.logical.relational.LogicalSchema.LogicalFieldSchema;
@@ -98,6 +108,11 @@
         public void visitProject(ProjectExpression project) throws IOException {
             project.setUid(currentOp);
         }
+        
+        @Override
+        public void visitMapLookup( MapLookupExpression op ) throws IOException {
+            op.setUid(currentOp);
+        }
     
         @Override
         public void visitConstant(ConstantExpression constant) throws IOException {
@@ -108,6 +123,48 @@
         public void visitCast(CastExpression cast) throws IOException {
             cast.setUid(currentOp);
         }
+        
+        @Override
+        public void visitNotEqual(NotEqualExpression exp) throws IOException { 
+            exp.setUid(currentOp);
+        }
+
+        @Override
+        public void visitNot(NotExpression exp ) throws IOException {
+            exp.setUid(currentOp);
+        }
+
+        @Override
+        public void visitIsNull(IsNullExpression exp) throws IOException {
+            exp.setUid(currentOp);
+        }
+        
+        @Override
+        public void visitNegative(NegativeExpression exp) throws IOException {
+            exp.setUid(currentOp);
+        }
+        
+        @Override
+        public void visitAdd(AddExpression exp) throws IOException {
+            exp.setUid(currentOp);
+        }
+        
+        @Override
+        public void visitSubtract(SubtractExpression exp) throws IOException {
+            exp.setUid(currentOp);
+        }
+       
+        public void visitMultiply(MultiplyExpression op) throws IOException {
+            op.setUid(currentOp);
+        }
+        
+        public void visitMod(ModExpression op) throws IOException {
+            op.setUid(currentOp);
+        }
+        
+        public void visitDivide(DivideExpression op) throws IOException {
+            op.setUid(currentOp);
+        }
     }
     
     /* (non-Javadoc)

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOForEach.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOForEach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOForEach.java Fri Mar  5 21:55:19 2010
@@ -18,6 +18,7 @@
 package org.apache.pig.experimental.logical.relational;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.pig.experimental.plan.Operator;
@@ -50,6 +51,17 @@
         
         return innerPlan.isEqual(((LOForEach)other).innerPlan);
     }
+    
+    public void resetSchema() {
+        super.resetSchema();
+        
+        // reset schema in the inner plan
+        Iterator<Operator> iter = innerPlan.getOperators();
+        while(iter.hasNext()) {
+             LogicalRelationalOperator op = (LogicalRelationalOperator)iter.next();
+             op.resetSchema();
+        }
+    }
        
     @Override
     public LogicalSchema getSchema() {

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java Fri Mar  5 21:55:19 2010
@@ -48,20 +48,13 @@
         schema = new LogicalSchema();
         
         for(int i=0; i<outputPlans.size(); i++) {
-            LogicalExpression exp = (LogicalExpression)outputPlans.get(i).getSinks().get(0);
+            LogicalExpression exp = (LogicalExpression)outputPlans.get(i).getSources().get(0);
             byte t = exp.getType();
             LogicalSchema fieldSchema = null;
             String alias = null;
             
-            // if type is primitive, just add to schema
-            if (t != DataType.TUPLE && t != DataType.BAG) {
-                LogicalFieldSchema f = new LogicalSchema.LogicalFieldSchema(alias, fieldSchema, t, exp.getUid());                
-                schema.addField(f);
-                continue;
-            }
-                       
             // for tuple and bag type, if there is projection, calculate schema of this field
-            if (exp instanceof ProjectExpression) {
+            if (exp instanceof ProjectExpression) {                
                 LogicalRelationalOperator op = null;
                 try{
                     op = ((ProjectExpression)exp).findReferent(this);
@@ -74,7 +67,14 @@
                     alias = s.getField(((ProjectExpression)exp).getColNum()).alias;
                 }
             }
-                
+            
+            // if type is primitive, just add to schema
+            if (t != DataType.TUPLE && t != DataType.BAG) {
+                LogicalFieldSchema f = new LogicalSchema.LogicalFieldSchema(alias, fieldSchema, t, exp.getUid());                
+                schema.addField(f);
+                continue;
+            }
+            
             // if flatten is set, set schema of tuple field to this schema
             if (flattenFlags[i]) {
                 if (t == DataType.BAG) {
@@ -91,7 +91,8 @@
                 if (fieldSchema != null) {
                     List<LogicalFieldSchema> ll = fieldSchema.getFields();
                     for(LogicalFieldSchema f: ll) {
-                        schema.addField(f);
+                        LogicalFieldSchema nf = new LogicalSchema.LogicalFieldSchema(alias+"::"+f.alias, f.schema, f.type, f.uid); 
+                        schema.addField(nf);
                     }                               
                 } else {
                     schema = null;
@@ -102,7 +103,6 @@
                  schema.addField(f);  
             }                                                      
         }
-        
         return schema;
     }
 
@@ -148,4 +148,12 @@
             ((LogicalPlanVisitor)v).visitLOGenerate(this);
     }
 
+    @Override
+    public String toString() {
+        StringBuilder msg = new StringBuilder();
+
+        msg.append("(Name: " + name + " Schema: " + getSchema() + ")");
+
+        return msg.toString();
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java Fri Mar  5 21:55:19 2010
@@ -19,6 +19,8 @@
 
 import java.io.IOException;
 
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
 import org.apache.pig.experimental.plan.Operator;
 import org.apache.pig.experimental.plan.OperatorPlan;
 import org.apache.pig.experimental.plan.PlanVisitor;
@@ -28,13 +30,19 @@
  * It can only be used in the inner plan of LOForEach
  *
  */
-public class LOInnerLoad extends LogicalRelationalOperator {
-    private int colNum;
+public class LOInnerLoad extends LogicalRelationalOperator {    
+    private ProjectExpression prj; 
     private LOForEach foreach;
 
     public LOInnerLoad(OperatorPlan plan, LOForEach foreach, int colNum) {
-        super("LOInnerLoad", plan);
-        this.colNum = colNum;
+        super("LOInnerLoad", plan);        
+        
+        // store column number as a ProjectExpression in a plan 
+        // to be able to dynamically adjust column number during optimization
+        LogicalExpressionPlan exp = new LogicalExpressionPlan();
+        
+        // we don't care about type, so set to -1
+        prj = new ProjectExpression(exp, (byte)-1, 0, colNum);
         this.foreach = foreach;
     }
 
@@ -47,9 +55,19 @@
         LogicalPlan p = (LogicalPlan)foreach.getPlan();
         try {
             LogicalRelationalOperator op = (LogicalRelationalOperator)p.getPredecessors(foreach).get(0);
-            if (op.getSchema() != null) {
-                schema = new LogicalSchema();                
-                schema.addField(op.getSchema().getField(colNum));
+            LogicalSchema s = op.getSchema();
+            if (s != null) {
+                schema = new LogicalSchema();      
+                long uid = prj.getUid();
+                for(int i=0; i<s.size(); i++) {
+                    if (uid == s.getField(i).uid) {
+                        schema.addField(s.getField(i));
+                    }
+                }
+            }
+            
+            if ( schema != null && schema.size() == 0) {
+                schema = null;
             }
         }catch(Exception e) {
             throw new RuntimeException(e);
@@ -57,6 +75,10 @@
         
         return schema;
     }
+    
+    public LogicalExpressionPlan getExpression() {
+        return (LogicalExpressionPlan)prj.getPlan();
+    }
 
     @Override
     public boolean isEqual(Operator other) {
@@ -64,7 +86,7 @@
             return false;
         }
         
-        return (colNum == ((LOInnerLoad)other).colNum);
+        return (getColNum() == ((LOInnerLoad)other).getColNum());
     }    
     
     @Override
@@ -76,7 +98,7 @@
     }
 
     public int getColNum() {
-        return colNum;
+        return prj.getColNum();
     }
     
     /**

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java Fri Mar  5 21:55:19 2010
@@ -20,14 +20,18 @@
 
 import java.io.IOException;
 
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadPushDown;
 import org.apache.pig.experimental.plan.Operator;
 import org.apache.pig.experimental.plan.PlanVisitor;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 
 public class LOLoad extends LogicalRelationalOperator {
     
     private LogicalSchema scriptSchema;
     private FileSpec fs;
+    private transient LoadPushDown loadFunc;
 
     /**
      * 
@@ -42,6 +46,25 @@
        fs = loader;
     }
     
+    public LoadPushDown getLoadPushDown() {
+        try { 
+            if (loadFunc == null) {
+                Object obj = PigContext.instantiateFuncFromSpec(fs.getFuncSpec());
+                if (obj instanceof LoadPushDown) {
+                    loadFunc = (LoadPushDown)obj;
+                }
+            }
+            
+            return loadFunc;
+        }catch (ClassCastException cce) {
+            throw new RuntimeException(fs.getFuncSpec() + " should implement the LoadFunc interface.");    		
+        }
+    }
+    
+    public void setScriptSchema(LogicalSchema schema) {
+        scriptSchema = schema;
+    }
+    
     /**
      * Get the schema for this load.  The schema will be either be what was
      * given by the user in the script or what the load functions getSchema

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java Fri Mar  5 21:55:19 2010
@@ -187,11 +187,6 @@
         exprOp.setColumn(load.getColNum());
         
         // set input to POProject to the predecessor of foreach
-        List<PhysicalOperator> l = new ArrayList<PhysicalOperator>();
-        LOForEach foreach = load.getLOForEach();        
-        Operator pred = foreach.getPlan().getPredecessors(foreach).get(0);
-        l.add(logToPhyMap.get(pred));
-        exprOp.setInputs(l);
         
         logToPhyMap.put(load, exprOp);
         currentPlan.add(exprOp);
@@ -214,19 +209,10 @@
         // we need to translate each predecessor of LOGenerate into a physical plan.
         // The physical plan should contain the expression plan for this predecessor plus
         // the subtree starting with this predecessor
-        for (int i=0; i<preds.size(); i++) {
+        for (int i=0; i<exps.size(); i++) {
             currentPlan = new PhysicalPlan();
-            // translate the predecessors into a physical plan
-            PlanWalker childWalker = new SubtreeDependencyOrderWalker(inner, preds.get(i));
-            pushWalker(childWalker);
-            childWalker.walk(this);
-            popWalker();
-            
-            // get the leaf of partially translated plan
-            PhysicalOperator leaf = currentPlan.getLeaves().get(0);
-            
-            // add up the expressions
-            childWalker = new ReverseDependencyOrderWalker(exps.get(i));
+            // translate the expression plan
+            PlanWalker childWalker = new ReverseDependencyOrderWalker(exps.get(i));
             pushWalker(childWalker);
             childWalker.walk(new ExpToPhyTranslationVisitor(exps.get(i),
                     childWalker, gen, currentPlan, logToPhyMap ));            
@@ -237,29 +223,34 @@
                 PhysicalOperator op = logToPhyMap.get(l);
                 if (l instanceof ProjectExpression) {
                     int input = ((ProjectExpression)l).getInputNum();
+                    
+                    // for each sink projection, get its input logical plan and translate it
                     Operator pred = preds.get(input);
+                    childWalker = new SubtreeDependencyOrderWalker(inner, pred);
+                    pushWalker(childWalker);
+                    childWalker.walk(this);
+                    popWalker();
+                    
+                    // get the physical operator of the leaf of input logical plan
+                    PhysicalOperator leaf = logToPhyMap.get(pred);                    
+                    
                     if (pred instanceof LOInnerLoad) {
-                        List<PhysicalOperator> ll = currentPlan.getSuccessors(op);     
-                        PhysicalOperator[] ll2 = null;
-                        if (ll != null) {
-                            ll2 = ll.toArray(new PhysicalOperator[0]);
-                        }
-                        currentPlan.remove(op);
-                        if (ll2 != null) {                        	
-                            for(PhysicalOperator suc: ll2) {
-                                currentPlan.connect(leaf, suc);
-                            }
-                        }
-                        
-                        innerPlans.add(currentPlan);
-                        
-                        continue;
+                        // if predecessor is only an LOInnerLoad, remove the project that
+                        // comes from LOInnerLoad and change the column of project that
+                        // comes from expression plan
+                        currentPlan.remove(leaf);
+                        logToPhyMap.remove(pred);
+                        ((POProject)op).setColumn( ((POProject)leaf).getColumn() );
+                           
+                    }else{                    
+                        currentPlan.connect(leaf, op);
                     }
                 }
-                
-                currentPlan.connect(leaf, op);                
-                innerPlans.add(currentPlan);
-            }                        
+            }  
+           
+            
+            
+            innerPlans.add(currentPlan);
         }
         
         currentPlan = currentPlans.pop();

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java Fri Mar  5 21:55:19 2010
@@ -18,172 +18,18 @@
 
 package org.apache.pig.experimental.logical.relational;
 
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
 import org.apache.pig.experimental.plan.BaseOperatorPlan;
-import org.apache.pig.experimental.plan.Operator;
 import org.apache.pig.experimental.plan.OperatorPlan;
-import org.apache.pig.experimental.plan.OperatorSubPlan;
 
 /**
  * LogicalPlan is the logical view of relational operations Pig will execute 
- * for a given script.  Note that it contains only realtional operations.
+ * for a given script.  Note that it contains only relational operations.
  * All expressions will be contained in LogicalExpressionPlans inside
- * each relational operator.  LogicalPlan provides operations for
- * removing and adding LogicalRelationalOperators.  These will handle doing
- * all of the necessary add, remove, connect, and disconnect calls in
- * OperatorPlan.  They will not handle patching up individual relational
- * operators.  That will be handle by the various Patchers.
- *
+ * each relational operator.
  */
 public class LogicalPlan extends BaseOperatorPlan {
     
     /**
-     * Add a relational operation to the plan.
-     * @param before operator that will be before the new operator.  This
-     * operator should already be in the plan.  If before is null then
-     * the new operator will be a root.
-     * @param newOper new operator to add.  This operator should not already
-     * be in the plan.
-     * @param after operator  that will be after the new operator.  This
-     * operator should already be in the plan.  If after is null, then the
-     * new operator will be a leaf.
-     * @throws IOException if add is already in the plan, or before or after
-     * are not in the plan.
-     */
-    public void add(LogicalRelationalOperator before,
-                    LogicalRelationalOperator newOper,
-                    LogicalRelationalOperator after) throws IOException {
-        doAdd(before, newOper, after);
-    }
-   
-    /**
-     * Add a relational operation with multiple outputs to the plan.
-     * @param before operators that will be before the new operator.  These
-     * operator should already be in the plan.
-     * @param newOper new operator to add.  This operator should not already
-     * be in the plan.
-     * @param after operator  that will be after the new operator.  This
-     * operator should already be in the plan.  If after is null, then the
-     * new operator will be a leaf.
-     * @throws IOException if add is already in the plan, or before or after
-     * are not in the plan.
-     */
-    public void add(LogicalRelationalOperator[] before,
-                    LogicalRelationalOperator newOper,
-                    LogicalRelationalOperator after) throws IOException {
-        doAdd(null, newOper, after);
-        
-        for (LogicalRelationalOperator op : before) {
-            checkIn(op);
-            connect(op, newOper);
-        }
-    }
-    
-    /**
-     * Add a relational operation with multiple inputs to the plan.
-     * @param before operator that will be before the new operator.  This
-     * operator should already be in the plan.  If before is null then
-     * the new operator will be a root.
-     * @param newOper new operator to add.  This operator should not already
-     * be in the plan.
-     * @param after operators that will be after the new operator.  These
-     * operator should already be in the plan.
-     * @throws IOException if add is already in the plan, or before or after
-     * are not in the plan.
-     */
-    public void add(LogicalRelationalOperator before,
-                    LogicalRelationalOperator newOper,
-                    LogicalRelationalOperator[] after) throws IOException {
-        doAdd(before, newOper, null);
-        
-        for (LogicalRelationalOperator op : after) {
-            checkIn(op);
-            connect(newOper, op);
-        }
-    }
-    
-    /**
-     * Add a relational operation to the plan when the caller wants to control
-     * how the nodes are connected in the graph.
-     * @param before operator that will be before the new operator.  This
-     * operator should already be in the plan.  before should not be null.
-     * the new operator will be a root.
-     * @param beforeToPos Position in before's edges to connect newOper at.
-     * @param beforeFromPos Position in newOps's edges to connect before at.
-     * @param newOper new operator to add.  This operator should not already
-     * be in the plan.
-     * @param afterToPos Position in after's edges to connect newOper at.
-     * @param afterFromPos Position in newOps's edges to connect after at.
-     * @param after operator  that will be after the new operator.  This
-     * operator should already be in the plan.  If after is null, then the
-     * new operator will be a leaf.
-     * @throws IOException if add is already in the plan, or before or after
-     * are not in the plan.
-     */
-    public void add(LogicalRelationalOperator before,
-                    int beforeToPos,
-                    int beforeFromPos,
-                    LogicalRelationalOperator newOper,
-                    int afterToPos,
-                    int afterFromPos,
-                    LogicalRelationalOperator after) throws IOException {
-        if (before != null) checkIn(before);
-        if (after != null) checkIn(after);
-        checkNotIn(newOper);
-        
-        add(newOper);
-        if (before != null) connect(before, beforeToPos, newOper, beforeFromPos);
-        if (after != null) connect(newOper, afterToPos, after, afterFromPos);
-        
-    }
-    
-    /**
-     * Remove an operator from the logical plan.  This call will take care
-     * of disconnecting the operator, connecting the predecessor(s) and 
-     * successor(s) and patching up the plan. 
-     * @param op operator to be removed.
-     * @throws IOException If the operator is not in the plan.
-     */
-    public void removeLogical(LogicalRelationalOperator op) throws IOException {
-        
-        checkIn(op);
-        List<Operator> pred = getPredecessors(op);
-        List<Operator> succ = getSuccessors(op);
-        int predSz = pred.size();
-        int succSz = succ.size();
-        if (predSz > 1 && succSz > 1) {
-            // Don't have a clue what to do here.  We shouldn't have any
-            // operators that have multiple inputs and multiple outputs.
-            throw new IOException("Attempt to remove a node with multiple "
-                + "inputs and outputs!");
-        }
-        
-        // Disconnect and remove the given node.
-        for (Operator p : pred) {
-            disconnect(p, op);
-        }
-        for (Operator s : succ) {
-            disconnect(op, s);
-        }
-        remove(op);
-        
-        // Now reconnect the before and after
-        if (predSz > 1 && succSz == 1) {
-            for (Operator p : pred) {
-                connect(p, succ.get(0));
-            }
-        } else if (predSz == 1 && succSz >= 1) {
-            for (Operator s : succ) {
-                connect(pred.get(0), s);
-            }
-        }
-        
-    }
-    
-    /**
      * Equality is checked by calling equals on every leaf in the plan.  This
      * assumes that plans are always connected graphs.  It is somewhat 
      * inefficient since every leaf will test equality all the way to 
@@ -205,29 +51,4 @@
         return super.isEqual(other);   
     }
     
-    private void doAdd(LogicalRelationalOperator before,
-                       LogicalRelationalOperator newOper,
-                       LogicalRelationalOperator after) throws IOException {
-        if (before != null) checkIn(before);
-        if (after != null) checkIn(after);
-        checkNotIn(newOper);
-        
-        add(newOper);
-        if (before != null) connect(before, newOper);
-        if (after != null) connect(newOper, after);
-    }
-    
-    private void checkIn(LogicalRelationalOperator op) throws IOException {
-        if (!ops.contains(op)) {
-            throw new IOException("Attempt to use operator " + op.getName() + 
-                " which is not in the plan.");
-        }
-    }
-    
-     private void checkNotIn(LogicalRelationalOperator op) throws IOException {
-        if (ops.contains(op)) {
-            throw new IOException("Attempt to add operator " + op.getName() + 
-                " which is already in the plan.");
-        }
-     }           
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java Fri Mar  5 21:55:19 2010
@@ -140,5 +140,13 @@
             if (!s.isEqual(os)) return false;
         }
         return true;
-    } 
+    }
+    
+    public String toString() {
+        StringBuilder msg = new StringBuilder();
+
+        msg.append("(Name: " + name + " Schema: " + getSchema() + ")");
+
+        return msg.toString();
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java Fri Mar  5 21:55:19 2010
@@ -23,6 +23,9 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.util.Pair;
+
 /**
  * Schema, from a logical perspective.
  */
@@ -60,16 +63,31 @@
                 return false;
             }
         }
+        
+        public String toString() {
+            if( type == DataType.BAG ) {
+                if( schema == null ) {
+                    return ( alias + "#" + uid + ":bag{}#" );
+                }
+                return ( alias + "#" + uid + ":bag{" + schema.toString() + "}" );
+            } else if( type == DataType.TUPLE ) {
+                if( schema == null ) {
+                    return ( alias + "#" + uid + ":tuple{}" );
+                }
+                return ( alias + "#" + uid + ":tuple(" + schema.toString() + ")" );
+            }
+            return ( alias + "#" + uid + ":" + DataType.findTypeName(type) );
+        }
     }
 
     
     
     private List<LogicalFieldSchema> fields;
-    private Map<String, Integer> aliases;
+    private Map<String, Pair<Integer, Boolean>> aliases;
     
     public LogicalSchema() {
         fields = new ArrayList<LogicalFieldSchema>();
-        aliases = new HashMap<String, Integer>();
+        aliases = new HashMap<String, Pair<Integer, Boolean>>();
     }
     
     /**
@@ -79,16 +97,25 @@
     public void addField(LogicalFieldSchema field) {
         fields.add(field);
         if (field.alias != null && !field.alias.equals("")) {
-            aliases.put(field.alias, fields.size() - 1);
+            // put the full name of this field into aliases map
+            // boolean in the pair indicates if this alias is full name
+            aliases.put(field.alias, new Pair<Integer, Boolean>(fields.size()-1, true));
             int index = 0;
+            
+            // check and put short names into alias map if there is no conflict
             while(index != -1) {
                 index = field.alias.indexOf("::", index);
                 if (index != -1) {
                     String a = field.alias.substring(index+2);
                     if (aliases.containsKey(a)) {
-                        aliases.remove(a);
+                        // remove conflict if the conflict is not full name
+                        // we can never remove full name
+                        if (!aliases.get(a).second) {
+                            aliases.remove(a);
+                        }
                     }else{
-                        aliases.put(a, fields.size()-1);                       
+                        // put alias into map and indicate it is a short name
+                        aliases.put(a, new Pair<Integer, Boolean>(fields.size()-1, false));                       
                     }
 
                     index = index +2;
@@ -103,12 +130,12 @@
      * @return field associated with alias, or null if no such field
      */
     public LogicalFieldSchema getField(String alias) {
-        Integer index = aliases.get(alias);
-        if (index == null) {
+        Pair<Integer, Boolean> p = aliases.get(alias);
+        if (p == null) {
             return null;
         }
 
-        return fields.get(index);
+        return fields.get(p.first);
     }
 
     /**
@@ -165,4 +192,16 @@
         return null;
     }
     
+    public String toString() {
+        StringBuilder str = new StringBuilder();
+        
+        for( LogicalFieldSchema field : fields ) {
+            str.append( field.toString() + "," );
+        }
+        if( fields.size() != 0 ) {
+            str.deleteCharAt( str.length() -1 );
+        }
+        return str.toString();
+    }
+    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java Fri Mar  5 21:55:19 2010
@@ -38,6 +38,12 @@
 import org.apache.pig.experimental.plan.optimizer.Transformer;
 import org.apache.pig.impl.util.Pair;
 
+/**
+ * This Rule moves Filter Above Foreach.
+ * It checks if uid on which filter works on
+ * is present in the predecessor of foreach.
+ * If so it transforms it.
+ */
 public class FilterAboveForeach extends Rule {
 
     public FilterAboveForeach(String n) {
@@ -47,7 +53,7 @@
     @Override
     protected OperatorPlan buildPattern() {
         // the pattern that this rule looks for
-        // is foreach -> flatten -> filter
+        // is foreach -> filter
         LogicalPlan plan = new LogicalPlan();
         LogicalRelationalOperator foreach = new LOForEach(plan);
         LogicalRelationalOperator filter = new LOFilter(plan);
@@ -85,25 +91,6 @@
             // This would be a strange case
             if( foreach == null ) return false;
             
-            List<Operator> sinks = foreach.getInnerPlan().getSinks();            
-            if( ! ( sinks.size() == 1 && (sinks.get(0) instanceof LOGenerate ) ) ) {
-                return false;
-            }
-
-//            LOGenerate generate = (LOGenerate)sinks.get(0);
-//            // We check if we have any flatten
-//            // Other cases are handled by other Optimizers
-//            boolean hasFlatten = false;            
-//            for( boolean flattenFlag : generate.getFlattenFlags() ) {
-//                if( flattenFlag ) {
-//                    hasFlatten = true;
-//                    break;
-//                }
-//            }
-//
-//            if( !hasFlatten )
-//                return false;             
-            
             iter = matched.getOperators();
             while( iter.hasNext() ) {
                 Operator op = iter.next();
@@ -147,6 +134,11 @@
             return false;            
         }
         
+        /**
+         * Get all uids from Projections of this FilterOperator
+         * @param filter
+         * @return Set of uid
+         */
         private Set<Long> getFilterProjectionUids( LOFilter filter ) {
             Set<Long> uids = new HashSet<Long>();
             if( filter != null ) {
@@ -163,7 +155,12 @@
             return uids;
         }
         
-        // check if a relational operator contains all of the specified uids
+        /**
+         * checks if a relational operator contains all of the specified uids
+         * @param op LogicalRelational operator that should contain the uid
+         * @param uids Uids to check for
+         * @return true if given LogicalRelationalOperator has all the given uids
+         */
         private boolean hasAll(LogicalRelationalOperator op, Set<Long> uids) {
             LogicalSchema schema = op.getSchema();
             List<LogicalSchema.LogicalFieldSchema> fields = schema.getFields();
@@ -223,14 +220,17 @@
             /*
              *          ForEachPred
              *               |
-             *            ForEach
+             *            ForEach         
              *               |
              *             Filter*
+             *      ( These are filters
+             *      which cannot be moved )
              *               |
              *           FilterPred                 
-             *  ( has to be a Filter or ForEach )
+             *         ( is a Filter )
              *               |
              *             Filter
+             *        ( To be moved ) 
              *               |
              *            FilterSuc
              *              
@@ -243,15 +243,23 @@
              *            ForEachPred
              *               |
              *            Filter
+             *     ( After being Moved )
              *               |
              *            ForEach
              *               |
              *             Filter*
+             *       ( These are filters
+             *      which cannot be moved )
              *               |
              *           FilterPred                 
-             *  ( has to be a Filter or ForEach )
+             *         ( is a Filter )
              *               |
              *            FilterSuc
+             *            
+             *  Above plan is assuming we are modifying the filter in middle.
+             *  If we are modifying the first filter after ForEach then
+             *  -- * (kleene star) becomes zero
+             *  -- And ForEach is FilterPred 
              */
             
             Pair<Integer, Integer> forEachPredPlaces = currentPlan.disconnect(forEachPred, foreach);

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java Fri Mar  5 21:55:19 2010
@@ -19,6 +19,7 @@
 package org.apache.pig.experimental.plan;
 
 import java.io.IOException;
+import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -27,6 +28,10 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.experimental.logical.optimizer.PlanPrinter;
+import org.apache.pig.impl.logicalLayer.DotLOPrinter;
+import org.apache.pig.impl.logicalLayer.LOPrinter;
+import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.Pair;
 
 public abstract class BaseOperatorPlan implements OperatorPlan {
@@ -258,4 +263,13 @@
         return false;
     }
     
+    public void explain(PrintStream ps, String format, boolean verbose) 
+    throws IOException {
+        ps.println("#-----------------------------------------------");
+        ps.println("# New Logical Plan:");
+        ps.println("#-----------------------------------------------");
+
+        PlanPrinter npp = new PlanPrinter(this, ps);
+        npp.visit();
+}
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/Operator.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/Operator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/Operator.java Fri Mar  5 21:55:19 2010
@@ -74,6 +74,15 @@
     }
 
     /**
+     * Remove an annotation
+     * @param key the key of the annotation
+     * @return the original value of the annotation
+     */
+    public Object removeAnnotation(String key) {
+        return annotations.remove(key);
+    }
+    
+    /**
      * This is like a shallow equals comparison.
      * It returns true if two operators have equivalent properties even if they are 
      * different objects. Here properties mean equivalent plan and equivalent name.

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorPlan.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorPlan.java Fri Mar  5 21:55:19 2010
@@ -117,7 +117,7 @@
      * This is like a shallow comparison.
      * Two plans are equal if they have equivalent operators and equivalent 
      * structure.
-     * @param other object to compare
+     * @param other  object to compare
      * @return boolean if both the plans are equivalent
      */
     public boolean isEqual( OperatorPlan other );

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorSubPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorSubPlan.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorSubPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorSubPlan.java Fri Mar  5 21:55:19 2010
@@ -53,6 +53,8 @@
     
     public void add(Operator op) {
         operators.add(op);
+        leaves.clear();
+        roots.clear();
     }
 
     public void connect(Operator from, int fromPos, Operator to, int toPos) {

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterAboveForeach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterAboveForeach.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterAboveForeach.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterAboveForeach.java Fri Mar  5 21:55:19 2010
@@ -138,7 +138,6 @@
             assertTrue("Failed to set a valid uid", false );
         }
         
-        
         // run filter rule
         Rule r = new FilterAboveForeach("FilterAboveFlatten");
         Set<Rule> s = new HashSet<Rule>();
@@ -357,6 +356,8 @@
         
         assertTrue( plan.getPredecessors(filter).contains(load) );
         assertEquals( 1, plan.getPredecessors(filter).size() );
+        assertEquals( load.getSchema().getField(0).uid, namePrj2.getUid() );
+        assertEquals( namePrj2.getUid(), prjName.getUid() );
         
         assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
         assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
@@ -528,6 +529,10 @@
         assertTrue( plan.getPredecessors(filter).contains(filter2) );
         assertEquals( 1, plan.getPredecessors(filter).size() );
         
+        assertEquals( load.getSchema().getField(0).uid, namePrj2.getUid() );
+        assertEquals( namePrj2.getUid(), name2Prj2.getUid() );
+        assertEquals( name2Prj2.getUid(), prjName.getUid() );
+        
         assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
         assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
         assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
@@ -694,6 +699,9 @@
         assertTrue( plan.getPredecessors(foreach).contains(load) );
         assertEquals( 1, plan.getPredecessors(foreach).size() );
         
+        assertFalse( prjCuisines.getUid() == namePrj2.getUid() );
+        assertFalse( prjCuisines.getUid() == name2Prj2.getUid() );
+        
         assertTrue( plan.getPredecessors(filter).contains(filter2) );
         assertEquals( 1, plan.getPredecessors(filter).size() );
         
@@ -836,6 +844,8 @@
         assertTrue( plan.getPredecessors(stor).contains(filter) ); 
         assertEquals( 1, plan.getPredecessors(stor).size() );
         
+        assertFalse( prjCuisines.getUid() == namePrj2.getUid() );
+        
         assertTrue( plan.getPredecessors(filter).contains(foreach) );
         assertEquals( 1, plan.getPredecessors(filter).size() );
         
@@ -985,6 +995,9 @@
         assertTrue( plan.getPredecessors(foreach).contains(filter) );
         assertEquals( 1, plan.getPredecessors(foreach).size() );
         
+        assertEquals( load.getSchema().getField(0).uid , namePrj2.getUid() );
+        assertEquals( namePrj2.getUid(), prjName.getUid() );
+        
         assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
         assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
         assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
@@ -1004,201 +1017,4 @@
         }
         
     }
-    
-//    public class MyPrintVisitor extends AllExpressionVisitor {
-//
-//        private PrintStream mStream = null;
-//        private String TAB1 = "    ";
-//        private String TABMore = "|   ";
-//        private String LSep = "|\n|---";
-//        private String USep = "|   |\n|   ";
-//        private int levelCntr = -1;
-//        private boolean isVerbose = true;
-//        
-//        public MyPrintVisitor(OperatorPlan plan, PrintStream ps) {
-//            super(plan, new DepthFirstWalker(plan));
-//            mStream = ps;
-//        }
-//
-//        @Override
-//        protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr) {
-//            // TODO Auto-generated method stub
-//            return null;
-//        }      
-//        
-//        @Override
-//        public void visit() throws VisitorException {
-//            try {
-//                mStream.write(depthFirstLP().getBytes());
-//            } catch (IOException e) {
-//                throw new VisitorException(e);
-//            }
-//        }
-//
-//        public void setVerbose(boolean verbose) {
-//            isVerbose = verbose;
-//        }
-//
-//        public void print(OutputStream printer) throws VisitorException, IOException {
-//            printer.write(depthFirstLP().getBytes());
-//        }
-//
-//        class LogicalRelationalOperatorCompare implements Comparator<LogicalRelationalOperator> {
-//
-//            @Override
-//            public int compare(LogicalRelationalOperator o1,
-//                    LogicalRelationalOperator o2) {
-//                return 0;
-//            }
-//            
-//        }
-//
-//        protected String depthFirstLP() throws VisitorException, IOException {
-//            StringBuilder sb = new StringBuilder();
-//            List<Operator> leaves = plan.getSinks();
-//            // Collections.sort(leaves, c)
-//            for (Operator leaf : leaves) {
-//                sb.append(depthFirst(leaf));
-//                sb.append("\n");
-//            }
-//            //sb.delete(sb.length() - "\n".length(), sb.length());
-//            //sb.delete(sb.length() - "\n".length(), sb.length());
-//            return sb.toString();
-//        }
-//        
-//        private String planString(LogicalPlan lp) throws VisitorException, IOException {
-//            StringBuilder sb = new StringBuilder();
-//            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-//            if(lp!=null)
-//                lp.explain(baos, mStream);
-//            else
-//                return "";
-//            sb.append(USep);
-//            sb.append(shiftStringByTabs(baos.toString(), 2));
-//            return sb.toString();
-//        }
-//        
-//        private String planString(
-//                List<LogicalPlan> logicalPlanList) throws VisitorException, IOException {
-//            StringBuilder sb = new StringBuilder();
-//            if(logicalPlanList!=null)
-//                for (LogicalPlan lp : logicalPlanList) {
-//                    sb.append(planString(lp));
-//                }
-//            return sb.toString();
-//        }
-//
-//        private String depthFirst(Operator node) throws VisitorException, IOException {
-//            StringBuilder sb = new StringBuilder(node.getName());
-//            if(node instanceof LogicalExpression) {
-//                sb.append(" FieldSchema: ");
-//                try {
-//                    sb.append(((LogicalExpression)node).getUid());
-//                    sb.append(" Type: " + DataType.findTypeName(((LogicalExpression)node).getType()));
-//                } catch (Exception e) {
-//                    sb.append("Caught Exception: " + e.getMessage());
-//                }
-//            } else if( node instanceof LogicalRelationalOperator ){
-//                sb.append(" Schema: ");
-//                try {
-//                    sb.append(((LogicalRelationalOperator)node).getSchema());
-//                } catch (Exception e) {
-//                    sb.append("Caught exception: " + e.getMessage());
-//                }
-//            }
-//
-//            sb.append("\n");
-//
-//            if (isVerbose) {
-//                if(node instanceof LOFilter){
-//                    sb.append(planString(((LOFilter)node).getComparisonPlan()));
-//                }
-//                else if(node instanceof LOForEach){
-//                    sb.append(planString(((LOForEach)node).getForEachPlans()));        
-//                }
-//                else if(node instanceof LOGenerate){
-//                    sb.append(planString(((LOGenerate)node).getGeneratePlans())); 
-//                    
-//                }
-//                else if(node instanceof LOCogroup){
-//                    MultiMap<LogicalOperator, LogicalPlan> plans = ((LOCogroup)node).getGroupByPlans();
-//                    for (LogicalOperator lo : plans.keySet()) {
-//                        // Visit the associated plans
-//                        for (LogicalPlan plan : plans.get(lo)) {
-//                            sb.append(planString(plan));
-//                        }
-//                    }
-//                }
-//                else if(node instanceof LOJoin){
-//                    MultiMap<LogicalOperator, LogicalPlan> plans = ((LOJoin)node).getJoinPlans();
-//                    for (LogicalOperator lo : plans.keySet()) {
-//                        // Visit the associated plans
-//                        for (LogicalPlan plan : plans.get(lo)) {
-//                            sb.append(planString(plan));
-//                        }
-//                    }
-//                }
-//                else if(node instanceof LOJoin){
-//                    MultiMap<LogicalOperator, LogicalPlan> plans = ((LOJoin)node).getJoinPlans();
-//                    for (LogicalOperator lo : plans.keySet()) {
-//                        // Visit the associated plans
-//                        for (LogicalPlan plan : plans.get(lo)) {
-//                            sb.append(planString(plan));
-//                        }
-//                    }
-//                }
-//                else if(node instanceof LOSort){
-//                    sb.append(planString(((LOSort)node).getSortColPlans())); 
-//                }
-//                else if(node instanceof LOSplitOutput){
-//                    sb.append(planString(((LOSplitOutput)node).getConditionPlan()));
-//                }
-//                else if (node instanceof LOProject) {
-//                    sb.append("Input: ");
-//                    sb.append(((LOProject)node).getExpression().name());
-//                }
-//            }
-//            
-//            List<LogicalOperator> originalPredecessors =  mPlan.getPredecessors(node);
-//            if (originalPredecessors == null)
-//                return sb.toString();
-//            
-//            List<LogicalOperator> predecessors =  new ArrayList<LogicalOperator>(originalPredecessors);
-//            
-//            Collections.sort(predecessors);
-//            int i = 0;
-//            for (LogicalOperator pred : predecessors) {
-//                i++;
-//                String DFStr = depthFirst(pred);
-//                if (DFStr != null) {
-//                    sb.append(LSep);
-//                    if (i < predecessors.size())
-//                        sb.append(shiftStringByTabs(DFStr, 2));
-//                    else
-//                        sb.append(shiftStringByTabs(DFStr, 1));
-//                }
-//            }
-//            return sb.toString();
-//        }
-//
-//        private String shiftStringByTabs(String DFStr, int TabType) {
-//            StringBuilder sb = new StringBuilder();
-//            String[] spl = DFStr.split("\n");
-//
-//            String tab = (TabType == 1) ? TAB1 : TABMore;
-//
-//            sb.append(spl[0] + "\n");
-//            for (int i = 1; i < spl.length; i++) {
-//                sb.append(tab);
-//                sb.append(spl[i]);
-//                sb.append("\n");
-//            }
-//            return sb.toString();
-//        }
-//
-//        private void dispTabs() {
-//            for (int i = 0; i < levelCntr; i++)
-//                System.out.print(TAB1);
-//        }
-//    }
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalListener.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalListener.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalListener.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalListener.java Fri Mar  5 21:55:19 2010
@@ -120,14 +120,18 @@
         LOFilter D = new LOFilter(lp, filterPlan);
         D.neverUseForRealSetSchema(cschema);
         // Connect D to B, since the transform has happened.
-        lp.add(B, D, (LogicalRelationalOperator)null);
+        lp.add(D);
+        lp.connect(B, D);
         
         // Now add in C, connected to A and D.
-        lp.add(new LogicalRelationalOperator[] {A, D}, C, null);
+        lp.add(C);
+        lp.connect(A, C);
+        lp.connect(D, C);
         
         changedPlan = new LogicalPlan();
         changedPlan.add(D);
-        changedPlan.add(D, C, (LogicalRelationalOperator)null);
+        changedPlan.add(C);
+        changedPlan.connect(D, C);
     }
     
     private static class SillySameVisitor extends AllSameVisitor {
@@ -156,7 +160,6 @@
     public void testAllSameVisitor() throws IOException {
         SillySameVisitor v = new SillySameVisitor(lp);
         v.visit();
-        System.out.println(v.toString());
         assertTrue("LOLoad LOJoin LOLoad LOFilter ".equals(v.toString()) ||
             "LOLoad LOFilter LOJoin LOLoad ".equals(v.toString()));