You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/02/18 23:20:09 UTC

svn commit: r911616 [4/7] - in /hadoop/pig/branches/load-store-redesign: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/ src/org/apach...

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/MergeFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/MergeFilter.java?rev=911616&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/MergeFilter.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/MergeFilter.java Thu Feb 18 22:20:07 2010
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.experimental.logical.expression.AndExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import org.apache.pig.experimental.plan.optimizer.Transformer;
+import org.apache.pig.impl.util.Pair;
+
+public class MergeFilter extends Rule {
+
+    public MergeFilter(String n) {
+        super(n);       
+    }
+
+    @Override
+    public Transformer getNewTransformer() {        
+        return new MergeFilterTransformer();
+    }
+
+    public class MergeFilterTransformer extends Transformer {
+
+        private OperatorSubPlan subPlan;
+
+        @Override
+        public boolean check(OperatorPlan matched) throws IOException {           
+            LOFilter filter = (LOFilter)matched.getSources().get(0);
+            List<Operator> succeds = currentPlan.getSuccessors(filter);
+            // if this filter is followed by another filter, we should combine them
+            if (succeds != null && succeds.size() == 1) {
+                if (succeds.get(0) instanceof LOFilter) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        @Override
+        public void transform(OperatorPlan matched) throws IOException {     
+            subPlan = new OperatorSubPlan(currentPlan);
+            
+            LOFilter filter = (LOFilter)matched.getSources().get(0);
+
+            subPlan.add(filter);
+            
+            List<Operator> succeds = currentPlan.getSuccessors(filter);
+            if (succeds != null && succeds.size()== 1 && (succeds.get(0) instanceof LOFilter)) {
+                LOFilter next = (LOFilter)succeds.get(0);
+                combineFilterCond(filter, next);
+                Pair<Integer, Integer> p1 = currentPlan.disconnect(filter, next);
+                List<Operator> ll = currentPlan.getSuccessors(next);
+                if (ll!= null && ll.size()>0) {
+                    Operator op = ll.get(0);
+                    Pair<Integer, Integer> p2 = currentPlan.disconnect(next, op);
+                    currentPlan.connect(filter, p1.first, op, p2.second);
+                    subPlan.add(op);
+                }
+                
+                currentPlan.remove(next);
+            }                            
+        }        
+        
+        @Override
+        public OperatorPlan reportChanges() {          
+            return subPlan;
+        }
+        
+        // combine the condition of two filters. The condition of second filter
+        // is added into the condition of first filter with an AND operator.
+        private void combineFilterCond(LOFilter f1, LOFilter f2) throws IOException {
+            LogicalExpressionPlan p1 = f1.getFilterPlan();
+            LogicalExpressionPlan p2 = f2.getFilterPlan();
+            LogicalExpressionPlan andPlan = new LogicalExpressionPlan();
+            
+            // add existing operators          
+            Iterator<Operator> iter = p1.getOperators();
+            while(iter.hasNext()) {
+                andPlan.add(iter.next());
+            }
+            
+            iter = p2.getOperators();
+            while(iter.hasNext()) {
+                andPlan.add(iter.next());
+            }
+            
+            // add all connections
+            iter = p1.getOperators();
+            while(iter.hasNext()) {
+                Operator n = iter.next();
+                List<Operator> l = p1.getPredecessors(n);
+                if (l != null) {
+                    for(Operator op: l) {
+                        andPlan.connect(op, n);
+                    }
+                }
+            }
+            
+            iter = p2.getOperators();
+            while(iter.hasNext()) {
+                Operator n = iter.next();
+                List<Operator> l = p2.getPredecessors(n);
+                if (l != null) {
+                    for(Operator op: l) {
+                        andPlan.connect(op, n);
+                    }
+                }
+            }          
+            
+            // create an AND
+            new AndExpression(andPlan, (LogicalExpression)p1.getSources().get(0), (LogicalExpression)p2.getSources().get(0));          
+            
+            f1.setFilterPlan(andPlan);
+        }
+
+    }
+
+    @Override
+    protected OperatorPlan buildPattern() {        
+        // the pattern that this rule looks for
+        // is filter operator
+        LogicalPlan plan = new LogicalPlan();
+        LogicalRelationalOperator op = new LOFilter(plan);
+        plan.add(op);        
+        
+        return plan;
+    }
+}
+

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java?rev=911616&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java Thu Feb 18 22:20:07 2010
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOJoin;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import org.apache.pig.experimental.plan.optimizer.Transformer;
+import org.apache.pig.impl.util.Pair;
+
+public class PushUpFilter extends Rule {
+    
+    public PushUpFilter(String n) {
+        super(n);       
+    }
+
+    @Override
+    public Transformer getNewTransformer() {        
+        return new PushUpFilterTransformer();
+    }
+
+    public class PushUpFilterTransformer extends Transformer {
+
+        private OperatorSubPlan subPlan;
+
+        @Override
+        public boolean check(OperatorPlan matched) throws IOException {   
+            // check if it is inner join
+            LOJoin join = (LOJoin)matched.getSources().get(0);
+            boolean[] innerFlags = join.getInnerFlags();
+            for(boolean inner: innerFlags) {
+                if (!inner){
+                    return false;
+                }
+            }
+           
+            Operator next = matched.getSinks().get(0);
+            while(next != null && next instanceof LOFilter) {
+                LOFilter filter = (LOFilter)next;            
+                LogicalExpressionPlan filterPlan = filter.getFilterPlan();
+                
+                // collect all uids used in the filter plan
+                Set<Long> uids = new HashSet<Long>();
+                Iterator<Operator> iter = filterPlan.getOperators();
+                while(iter.hasNext()) {
+                    Operator op = iter.next();
+                    if (op instanceof ProjectExpression) {
+                        long uid = ((ProjectExpression)op).getUid();
+                        uids.add(uid);
+                    }
+                }
+                                
+                List<Operator> preds = currentPlan.getPredecessors(join);
+                            
+                for(int j=0; j<preds.size(); j++) {
+                    if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) {                            
+                        return true;
+                    }
+                }                       
+             
+                // if current filter can not move up, check next filter
+                List<Operator> l = currentPlan.getSuccessors(filter);
+                if (l != null) {
+                    next = l.get(0);
+                } else {
+                    next = null;
+                }
+            }
+            
+            return false;
+        }
+
+        @Override
+        public void transform(OperatorPlan matched) throws IOException {
+            subPlan = new OperatorSubPlan(currentPlan);
+
+            LOJoin join = (LOJoin)matched.getSources().get(0);
+            subPlan.add(join);     
+            
+            Operator next = matched.getSinks().get(0);
+            while(next != null && next instanceof LOFilter) {
+                LOFilter filter = (LOFilter)next;                
+                subPlan.add(filter);
+                
+                LogicalExpressionPlan filterPlan = filter.getFilterPlan();
+                
+                // collect all uids used in the filter plan
+                Set<Long> uids = new HashSet<Long>();
+                Iterator<Operator> iter = filterPlan.getOperators();
+                while(iter.hasNext()) {
+                    Operator op = iter.next();
+                    if (op instanceof ProjectExpression) {
+                        long uid = ((ProjectExpression)op).getUid();
+                        uids.add(uid);
+                    }
+                }
+                
+                // find the farthest predecessor that has all the fields
+                LogicalRelationalOperator input = join;
+                List<Operator> preds = currentPlan.getPredecessors(input);
+                while(preds != null) {                
+                    boolean found = false;
+                    for(int j=0; j<preds.size(); j++) {
+                        if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) {
+                            input = (LogicalRelationalOperator)preds.get(j);   
+                            subPlan.add(input);
+                            found = true;
+                            break;
+                        }
+                    }
+                    if (!found) {
+                        break;
+                    }
+                    preds = currentPlan.getPredecessors(input);
+                }
+                            
+                if (input != join) {                           
+                    Operator pred = currentPlan.getPredecessors(filter).get(0);
+                    Operator succed = currentPlan.getSuccessors(filter).get(0);
+                    subPlan.add(succed);
+                    
+                    Pair<Integer, Integer> p1 = currentPlan.disconnect(pred, filter);
+                    Pair<Integer, Integer> p2 = currentPlan.disconnect(filter, succed);
+                    currentPlan.connect(pred, p1.first, succed, p2.second);
+                    
+                    succed = currentPlan.getSuccessors(input).get(0);
+                    Pair<Integer, Integer> p3 = currentPlan.disconnect(input, succed);
+                    currentPlan.connect(input, p3.first, filter, 0);
+                    currentPlan.connect(filter, 0, succed, p3.second);                                        
+                    
+                    return;
+                }  
+                
+                List<Operator> l = currentPlan.getSuccessors(filter);
+                if (l != null) {
+                    next = l.get(0);
+                } else {
+                    next = null;
+                }                         
+            }
+        }
+        
+        // check if a relational operator contains all of the specified uids
+        private boolean hasAll(LogicalRelationalOperator op, Set<Long> uids) {
+            LogicalSchema schema = op.getSchema();
+            List<LogicalSchema.LogicalFieldSchema> fields = schema.getFields();
+            Set<Long> all = new HashSet<Long>();
+            for(LogicalSchema.LogicalFieldSchema f:fields) {
+                all.add(f.uid);
+            }
+            
+            return all.containsAll(uids);
+        }
+           
+        @Override
+        public OperatorPlan reportChanges() {            
+            return subPlan;
+        }          
+
+    }
+
+    @Override
+    protected OperatorPlan buildPattern() {        
+        // the pattern that this rule looks for
+        // is join -> filter
+        LogicalPlan plan = new LogicalPlan();
+        LogicalRelationalOperator op1 = new LOJoin(plan);
+        LogicalRelationalOperator op2 = new LOFilter(plan);
+        plan.add(op1);
+        plan.add(op2);
+        plan.connect(op1, op2);
+        
+        return plan;
+    }
+}
+

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/SplitFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/SplitFilter.java?rev=911616&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/SplitFilter.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/SplitFilter.java Thu Feb 18 22:20:07 2010
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.experimental.logical.expression.AndExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import org.apache.pig.experimental.plan.optimizer.Transformer;
+import org.apache.pig.impl.util.Pair;
+
+public class SplitFilter extends Rule {    
+
+    public SplitFilter(String n) {
+        super(n);       
+    }
+
+    @Override
+    public Transformer getNewTransformer() {        
+        return new SplitFilterTransformer();
+    }
+
+    public class SplitFilterTransformer extends Transformer {
+        private OperatorSubPlan subPlan;
+
+        @Override
+        public boolean check(OperatorPlan matched) throws IOException {
+            LOFilter filter = (LOFilter)matched.getSources().get(0);
+            LogicalExpressionPlan cond = filter.getFilterPlan();
+            LogicalExpression root = (LogicalExpression) cond.getSources().get(0);
+            if (root instanceof AndExpression) {
+                return true;
+            }
+            
+            return false;
+        }
+
+        @Override
+        public void transform(OperatorPlan matched) throws IOException {
+            subPlan = new OperatorSubPlan(currentPlan);
+            
+            // split one LOFilter into 2 by "AND"
+            LOFilter filter = (LOFilter)matched.getSources().get(0);
+            LogicalExpressionPlan cond = filter.getFilterPlan();
+            LogicalExpression root = (LogicalExpression) cond.getSources().get(0);
+            if (!(root instanceof AndExpression)) {
+                return;
+            }
+            LogicalExpressionPlan op1 = new LogicalExpressionPlan();
+            op1.add((LogicalExpression)cond.getSuccessors(root).get(0));
+            fillSubPlan(cond, op1, (LogicalExpression)cond.getSuccessors(root).get(0));
+            
+            LogicalExpressionPlan op2 = new LogicalExpressionPlan();
+            op2.add((LogicalExpression)cond.getSuccessors(root).get(1));
+            fillSubPlan(cond, op2, (LogicalExpression)cond.getSuccessors(root).get(1));
+            
+            filter.setFilterPlan(op1);
+            LOFilter filter2 = new LOFilter((LogicalPlan)currentPlan, op2);
+            currentPlan.add(filter2);
+            
+            Operator succed = null;
+            try {
+                List<Operator> succeds = currentPlan.getSuccessors(filter);
+                if (succeds != null) {
+                    succed = succeds.get(0);
+                    subPlan.add(succed);
+                    Pair<Integer, Integer> p = currentPlan.disconnect(filter, succed);
+                    currentPlan.connect(filter2, 0, succed, p.second);
+                    currentPlan.connect(filter, p.first, filter2, 0); 
+                } else {
+                    currentPlan.connect(filter, 0, filter2, 0); 
+                }
+            }catch(Exception e) {
+                throw new IOException(e);
+            }                       
+            
+            subPlan.add(filter);
+            subPlan.add(filter2);            
+        }
+        
+        @Override
+        public OperatorPlan reportChanges() {
+            return subPlan;
+        }
+        
+        private void fillSubPlan(OperatorPlan origPlan, 
+                OperatorPlan subPlan, Operator startOp) throws IOException {
+                       
+            List<Operator> l = origPlan.getSuccessors(startOp);
+            if (l != null) {
+                for(Operator le: l) {
+                    subPlan.add(le);
+                    subPlan.connect(startOp, le);
+                    fillSubPlan(origPlan, subPlan, le);
+                }            
+            }
+        }
+
+    }
+
+    @Override
+    protected OperatorPlan buildPattern() {        
+        // the pattern that this rule looks for
+        // is filter
+        LogicalPlan plan = new LogicalPlan();      
+        LogicalRelationalOperator op2 = new LOFilter(plan);
+        plan.add(op2);
+        
+        return plan;
+    }
+}
+

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java Thu Feb 18 22:20:07 2010
@@ -60,9 +60,9 @@
      * @return all operators in the plan that have no predecessors, or
      * an empty list if the plan is empty.
      */
-    public List<Operator> getRoots() {
+    public List<Operator> getSources() {
         if (roots.size() == 0 && ops.size() > 0) {
-            for (Operator op : ops) {
+            for (Operator op : ops) {               
                 if (toEdges.get(op) == null) {
                     roots.add(op);
                 }
@@ -76,7 +76,7 @@
      * @return all operators in the plan that have no successors, or
      * an empty list if the plan is empty.
      */
-    public List<Operator> getLeaves() {
+    public List<Operator> getSinks() {
         if (leaves.size() == 0 && ops.size() > 0) {
             for (Operator op : ops) {
                 if (fromEdges.get(op) == null) {
@@ -200,5 +200,62 @@
     public Iterator<Operator> getOperators() {
         return ops.iterator();
     }
-
+   
+    public boolean isEqual(OperatorPlan other) {
+        return isEqual(this, other);
+    }
+    
+    private static boolean checkPredecessors(Operator op1,
+                                      Operator op2) {
+        try {
+            List<Operator> preds = op1.getPlan().getPredecessors(op1);
+            List<Operator> otherPreds = op2.getPlan().getPredecessors(op2);
+            if (preds == null && otherPreds == null) {
+                // intentionally blank
+            } else if (preds == null || otherPreds == null) {
+                return false;
+            } else {
+                if (preds.size() != otherPreds.size()) return false;
+                for (int i = 0; i < preds.size(); i++) {
+                    Operator p1 = preds.get(i);
+                    Operator p2 = otherPreds.get(i);
+                    if (!p1.isEqual(p2)) return false;
+                    if (!checkPredecessors(p1, p2)) return false;
+                }
+            }
+            return true;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }   
+    
+    protected static boolean isEqual(OperatorPlan p1, OperatorPlan p2) {
+        if (p1 == p2) {
+            return true;
+        }
+        
+        if (p1 != null && p2 != null) {
+            List<Operator> leaves = p1.getSinks();
+            List<Operator> otherLeaves = p2.getSinks();
+            if (leaves.size() != otherLeaves.size()) return false;
+            // Must find some leaf that is equal to each leaf.  There is no
+            // guarantee leaves will be returned in any particular order.
+            boolean foundAll = true;
+            for (Operator op1 : leaves) {
+                boolean foundOne = false;
+                for (Operator op2 : otherLeaves) {
+                    if (op1.isEqual(op2) && checkPredecessors(op1, op2)) {
+                        foundOne = true;
+                        break;
+                    }
+                }
+                foundAll &= foundOne;
+                if (!foundAll) return false;
+            }
+            return foundAll;
+        }
+        
+        return false;
+    }
+    
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java Thu Feb 18 22:20:07 2010
@@ -64,7 +64,7 @@
 
         List<Operator> fifo = new ArrayList<Operator>();
         Set<Operator> seen = new HashSet<Operator>();
-        List<Operator> leaves = plan.getLeaves();
+        List<Operator> leaves = plan.getSinks();
         if (leaves == null) return;
         for (Operator op : leaves) {
             doAllPredecessors(op, seen, fifo);

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DepthFirstWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DepthFirstWalker.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DepthFirstWalker.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DepthFirstWalker.java Thu Feb 18 22:20:07 2010
@@ -45,7 +45,7 @@
      */
     @Override
     public void walk(PlanVisitor visitor) throws IOException {
-        List<Operator> roots = plan.getRoots();
+        List<Operator> roots = plan.getSources();
         Set<Operator> seen = new HashSet<Operator>();
 
         depthFirst(null, roots, seen, visitor);

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/Operator.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/Operator.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/Operator.java Thu Feb 18 22:20:07 2010
@@ -18,6 +18,7 @@
 
 package org.apache.pig.experimental.plan;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -26,6 +27,7 @@
     protected String name;
     protected OperatorPlan plan; // plan that contains this operator
     protected Map<String, Object> annotations;
+    protected final int hashPrime = 31;
 
     public Operator(String n, OperatorPlan p) {
         name = n;
@@ -36,8 +38,9 @@
     /**
      * Accept a visitor at this node in the graph.
      * @param v Visitor to accept.
+     * @throws IOException 
      */
-    public abstract void accept(PlanVisitor v);
+    public abstract void accept(PlanVisitor v) throws IOException;
 
     public String getName() {
         return name;
@@ -70,4 +73,12 @@
         return annotations.get(key);
     }
 
+    /**
+     * This is like a shallow equals comparison.
+     * It returns true if two operators have equivalent properties even if they are 
+     * different objects. Here properties mean equivalent plan and equivalent name.
+     * @param operator
+     * @return true if two object have equivalent properties, else false
+     */
+    public abstract boolean isEqual(Operator operator);
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/OperatorPlan.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/OperatorPlan.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/OperatorPlan.java Thu Feb 18 22:20:07 2010
@@ -19,14 +19,9 @@
 package org.apache.pig.experimental.plan;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.pig.impl.util.Pair;
 
 public interface OperatorPlan {
@@ -41,20 +36,20 @@
      * @return all operators in the plan that have no predecessors, or
      * an empty list if the plan is empty.
      */
-    public List<Operator> getRoots();
+    public List<Operator> getSources();
 
     /**
      * Get all operators in the plan that have no successors.
      * @return all operators in the plan that have no successors, or
      * an empty list if the plan is empty.
      */
-    public List<Operator> getLeaves();
+    public List<Operator> getSinks();
 
     /**
      * For a given operator, get all operators immediately before it in the
      * plan.
      * @param op operator to fetch predecessors of
-     * @return list of all operators imeediately before op, or an empty list
+     * @return list of all operators immediately before op, or an empty list
      * if op is a root.
      * @throws IOException if op is not in the plan.
      */
@@ -63,7 +58,7 @@
     /**
      * For a given operator, get all operators immediately after it.
      * @param op operator to fetch successors of
-     * @return list of all operators imeediately after op, or an empty list
+     * @return list of all operators immediately after op, or an empty list
      * if op is a leaf.
      * @throws IOException if op is not in the plan.
      */
@@ -117,4 +112,13 @@
      * @return an iterator of all operators in this plan
      */
     public Iterator<Operator> getOperators();
+    
+    /**
+     * This is like a shallow comparison.
+     * Two plans are equal if they have equivalent operators and equivalent 
+     * structure.
+     * @param other object to compare
+     * @return boolean if both the plans are equivalent
+     */
+    public boolean isEqual( OperatorPlan other );
 }

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/OperatorSubPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/OperatorSubPlan.java?rev=911616&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/OperatorSubPlan.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/OperatorSubPlan.java Thu Feb 18 22:20:07 2010
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * Class to represent a view of a plan. The view contains a subset of the plan.
+ * All the operators returned from the view are the same objects to the operators
+ * in its base plan. It is used to represent match results. 
+ *
+ */
+public class OperatorSubPlan implements OperatorPlan {
+
+    private OperatorPlan basePlan;
+    private List<Operator> roots;
+    private List<Operator> leaves;
+    private Set<Operator> operators;
+
+    public OperatorSubPlan(OperatorPlan base) {
+        basePlan = base;
+        roots = new ArrayList<Operator>();
+        leaves = new ArrayList<Operator>();
+        operators = new HashSet<Operator>();
+    }    	    	
+    
+    public OperatorPlan getBasePlan() {
+        return basePlan;
+    }
+    
+    public void add(Operator op) {
+        operators.add(op);
+    }
+
+    public void connect(Operator from, int fromPos, Operator to, int toPos) {
+        throw new UnsupportedOperationException("connect() can not be called on OperatorSubPlan");
+    }
+
+    public void connect(Operator from, Operator to) {
+        throw new UnsupportedOperationException("connect() can not be called on OperatorSubPlan");
+    }
+
+    public Pair<Integer, Integer> disconnect(Operator from, Operator to) throws IOException {
+        throw new UnsupportedOperationException("disconnect() can not be called on OperatorSubPlan");
+    }
+
+    public List<Operator> getSinks() {
+        if (leaves.size() == 0 && operators.size() > 0) {
+            for (Operator op : operators) {       
+                try {
+                    if (getSuccessors(op) == null) {
+                        leaves.add(op);
+                    }
+                }catch(Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        return leaves;
+    }
+
+    public Iterator<Operator> getOperators() {
+        return operators.iterator();
+    }
+
+    public List<Operator> getPredecessors(Operator op) throws IOException {
+        List<Operator> l = basePlan.getPredecessors(op);
+        List<Operator> list = null;
+        if (l != null) {
+            for(Operator oper: l) {
+                if (operators.contains(oper)) {
+                    if (list == null) {
+                        list = new ArrayList<Operator>();
+                    }
+                    list.add(oper);
+                }
+            }
+        }
+        
+        return list;
+    }
+
+    public List<Operator> getSources() {
+        if (roots.size() == 0 && operators.size() > 0) {
+            for (Operator op : operators) {       
+                try {
+                    if (getPredecessors(op) == null) {
+                        roots.add(op);
+                    }
+                }catch(Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        return roots;
+    }
+
+    public List<Operator> getSuccessors(Operator op) throws IOException {
+        List<Operator> l = basePlan.getSuccessors(op);
+        List<Operator> list = null;
+        if (l != null) {
+            for(Operator oper: l) {
+                if (operators.contains(oper)) {
+                    if (list == null) {
+                        list = new ArrayList<Operator>();
+                    }
+                    list.add(oper);
+                }
+            }
+        }
+        
+        return list;
+    }
+
+    public void remove(Operator op) throws IOException {
+        operators.remove(op);
+        leaves.clear();
+        roots.clear();
+    }
+
+    public int size() {
+        return operators.size();
+    }
+
+    @Override
+    public boolean isEqual(OperatorPlan other) {		
+        return BaseOperatorPlan.isEqual(this, other);
+    }    
+}

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/PlanEdge.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/PlanEdge.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/PlanEdge.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/PlanEdge.java Thu Feb 18 22:20:07 2010
@@ -83,7 +83,8 @@
         Operator keeper = null;
         for (int j = 0; i.hasNext(); j++) {
             keeper = i.next();
-            if (keeper.equals(value)) {
+            //if (keeper.equals(value)) {
+            if (keeper == value) {
                 i.remove();
                 index = j;
                 break;

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/PlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/PlanVisitor.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/PlanVisitor.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/PlanVisitor.java Thu Feb 18 22:20:07 2010
@@ -31,6 +31,9 @@
  */
 public abstract class PlanVisitor {
 
+    // TODO Remove this scope value
+    final protected static String DEFAULT_SCOPE = "scope";
+    
     protected OperatorPlan plan;
 
     /**

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/ReverseDependencyOrderWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/ReverseDependencyOrderWalker.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/ReverseDependencyOrderWalker.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/ReverseDependencyOrderWalker.java Thu Feb 18 22:20:07 2010
@@ -58,7 +58,7 @@
 
         List<Operator> fifo = new ArrayList<Operator>();
         Set<Operator> seen = new HashSet<Operator>();
-        List<Operator> roots = plan.getRoots();
+        List<Operator> roots = plan.getSources();
         if (roots == null) return;
         for (Operator op : roots) {
             doAllSuccessors(op, seen, fifo);

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/SubtreeDependencyOrderWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/SubtreeDependencyOrderWalker.java?rev=911616&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/SubtreeDependencyOrderWalker.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/SubtreeDependencyOrderWalker.java Thu Feb 18 22:20:07 2010
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class SubtreeDependencyOrderWalker extends DependencyOrderWalker {
+    private Operator startNode;
+    
+    public SubtreeDependencyOrderWalker(OperatorPlan plan) {
+        super(plan);            
+    }
+    
+    public SubtreeDependencyOrderWalker(OperatorPlan plan, Operator startNode) {
+        super(plan);            
+        this.startNode = startNode;
+    }
+    
+    public void walk(PlanVisitor visitor) throws IOException {          
+        List<Operator> fifo = new ArrayList<Operator>();
+        Set<Operator> seen = new HashSet<Operator>();
+
+        // get all predecessors of startNode
+        doAllPredecessors(startNode, seen, fifo);           
+
+        for (Operator op: fifo) {
+            op.accept(visitor);
+        }
+    }
+}

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/PlanOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/PlanOptimizer.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/PlanOptimizer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/PlanOptimizer.java Thu Feb 18 22:20:07 2010
@@ -37,7 +37,7 @@
  * Each rule is has two parts:  a pattern and and associated transformer.
  * Transformers have two important functions:   check(), and transform().
  * The pattern describes a pattern of node types that the optimizer will
- * look ot match.  If that match is found anywhere in the plan, then check()
+ * look to match.  If that match is found anywhere in the plan, then check()
  * will be called.  check() allows the rule to look more in depth at the 
  * matched pattern and decide whether the rule should be run or not.  For
  * example, one might design a rule to push filters above join that would
@@ -74,7 +74,13 @@
         maxIter = (iterations < 1 ? defaultIterations : iterations);
     }
     
-    public void addPlanTransformListener(PlanTransformListener listener) {
+    /**
+     * Adds a listener to the optimization.  This listener will be fired 
+     * after each rule transforms a plan.  Listeners are guaranteed to
+     * be fired in the order they are added.
+     * @param listener
+     */
+    protected void addPlanTransformListener(PlanTransformListener listener) {
         listeners.add(listener);
     }
     
@@ -87,6 +93,7 @@
      * @throws OptimizerException
      */
     public void optimize() throws IOException {
+
         for (Set<Rule> rs : ruleSets) {
             boolean sawMatch = false;
             int numIterations = 0;
@@ -101,7 +108,7 @@
                                 sawMatch = true;
                                 transformer.transform(m);
                                 for(PlanTransformListener l: listeners) {
-                                    l.transformed(plan, transformer);
+                                    l.transformed(plan, transformer.reportChanges());
                                 }
                             }
                         }
@@ -109,5 +116,5 @@
                 }
             } while(sawMatch && ++numIterations < maxIter);
         }
-    }
+    }    
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/PlanTransformListener.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/PlanTransformListener.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/PlanTransformListener.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/PlanTransformListener.java Thu Feb 18 22:20:07 2010
@@ -18,6 +18,8 @@
 
 package org.apache.pig.experimental.plan.optimizer;
 
+import java.io.IOException;
+
 import org.apache.pig.experimental.plan.OperatorPlan;
 
 /**
@@ -26,9 +28,10 @@
 public interface PlanTransformListener {
     /**
      * the listener that is notified after a plan is transformed
-     * @param plan  the plan that is transformed
-     * @param transformer the transformer that transforms this plan
+     * @param fp  the full plan that has been transformed
+     * @param tp  a plan containing only the operators that have been transformed
+     * @throws IOException 
      */
-    public void transformed(OperatorPlan plan, Transformer transformer);
+    public void transformed(OperatorPlan fp, OperatorPlan tp) throws IOException;
 
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/Rule.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/Rule.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/Rule.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/Rule.java Thu Feb 18 22:20:07 2010
@@ -28,33 +28,15 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.experimental.plan.BaseOperatorPlan;
 import org.apache.pig.experimental.plan.Operator;
 import org.apache.pig.experimental.plan.OperatorPlan;
-import org.apache.pig.experimental.plan.PlanVisitor;
-import org.apache.pig.impl.util.Pair;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
 
 /**
  * Rules describe a pattern of operators.  They also reference a Transformer.
  * If the pattern of operators is found one or more times in the provided plan,
  * then the optimizer will use the associated Transformer to transform the
  * plan.
- *
- * The syntax for rules is  (x(y, z)) where x is a base node, and y and z are
- * node that precede x.  So the graph for the Pig Latin script:
- * A = load;
- * B = load;
- * C = join A, B;
- * D = filter C
- * would be:  filter(join(load, load));
- * 
- * Rules with multiple end points (leaves) are expressed as (x(), y()) where
- * both x and y are leaves.
- * 
- * It is expected that the name given to each node in the pattern exactly
- * matches the name of the class of the node in the Plan to be matched.  So
- * to build a rule that matched a join followed by a filter in the logical
- * plan, the pattern would be LOFilter(LOJoin).
  */
 public abstract class Rule {
 
@@ -121,7 +103,7 @@
     public List<OperatorPlan> match(OperatorPlan plan) {
         currentPlan = plan;
         
-        List<Operator> leaves = pattern.getLeaves();
+        List<Operator> leaves = pattern.getSinks();
         
         Iterator<Operator> iter = plan.getOperators();
         List<OperatorPlan> matchedList = new ArrayList<OperatorPlan>();       
@@ -162,11 +144,11 @@
                                 siblings = plan.getSuccessors(s);
                             }else{
                                 // for a root, we get its siblings by getting all roots
-                                siblings = plan.getRoots();
+                                siblings = plan.getSources();
                             }
                         }catch(IOException e) {
                             // not going to happen
-			    throw new RuntimeException(e);
+                            throw new RuntimeException(e);
                         }
                         int index = siblings.indexOf(op);
                         if (siblings.size()-index < leaves.size()) {
@@ -234,21 +216,14 @@
     }
     
  
-    private class PatternMatchOperatorPlan implements OperatorPlan {
-        OperatorPlan parent;
-        List<Operator> roots;
-        List<Operator> leaves;
-        Set<Operator> operators;
-
-        public PatternMatchOperatorPlan(OperatorPlan parent) {
-            this.parent = parent;
-            roots = new ArrayList<Operator>();
-            leaves = new ArrayList<Operator>();
-            operators = new HashSet<Operator>();
+    private class PatternMatchOperatorPlan extends OperatorSubPlan {
+        
+        public PatternMatchOperatorPlan(OperatorPlan basePlan) {
+            super(basePlan);
         }    	    	
         
         protected boolean check(List<Operator> planOps) throws IOException {
-            List<Operator> patternOps = pattern.getLeaves();
+            List<Operator> patternOps = pattern.getSinks();
             if (planOps.size() != patternOps.size()) {
                 return false;
             }
@@ -258,10 +233,13 @@
                 if (!check(planOps.get(i), patternOps.get(i), s)) {
                     return false;
                 }
-                operators.addAll(s);
+                Iterator<Operator> iter = s.iterator();
+                while(iter.hasNext()) {
+                    add(iter.next());
+                }
             }
             
-            if (operators.size() == pattern.size()) {
+            if (size() == pattern.size()) {
                 return true;
             }
             
@@ -283,13 +261,9 @@
             if (!match(planOp, patternOp)) {
                 return false;
             }
-            
-            if (pattern.getLeaves().contains(patternOp) && !leaves.contains(planOp)) {
-                leaves.add(planOp);
-            }
-            
+                 
             // check if their predecessors match
-            List<Operator> preds1 = parent.getPredecessors(planOp);
+            List<Operator> preds1 = getBasePlan().getPredecessors(planOp);
             List<Operator> preds2 = pattern.getPredecessors(patternOp);
             if (preds1 == null && preds2 != null) {
                 return false;
@@ -300,27 +274,25 @@
             }
             
             // we've reached the root of the pattern, so a match is found
-            if (preds2 == null || preds2.size() == 0) {
-                if (!roots.contains(planOp)) {
-                    roots.add(planOp);
-                }
+            if (preds2 == null || preds2.size() == 0) {       
                 opers.push(planOp);
                 return true;
             }
             
-            int index = 0;
-            boolean match = true;
+            int index = 0;            
             // look for predecessors 
             while(index < preds1.size()) {
+                boolean match = true;
                 if (match(preds1.get(index), preds2.get(0))) {
                     if ( (preds1.size() - index) < preds2.size()) {
                         return false;
                     }
-                                        
+                             
+                    int oldSize = opers.size();
                     for(int i=0; i<preds2.size(); i++) {
                         if (!check(preds1.get(index+i), preds2.get(i), opers)) {
-                            for(int j=0; j<i; j++) {
-                                opers.pop();
+                            for(int j=opers.size(); j>oldSize; j--) {
+                                opers.pop();                                
                             }
                             match = false;
                             break;
@@ -335,67 +307,6 @@
             }
             
             return false;
-        }
-        
-        public void add(Operator op) {
-            throw new UnsupportedOperationException("add() can not be called on PatternMatchOperatorPlan");
-        }
-
-        public void connect(Operator from, int fromPos, Operator to, int toPos) {
-            throw new UnsupportedOperationException("connect() can not be called on PatternMatchOperatorPlan");
-        }
-
-        public void connect(Operator from, Operator to) {
-            throw new UnsupportedOperationException("connect() can not be called on PatternMatchOperatorPlan");
-        }
-
-        public Pair<Integer, Integer> disconnect(Operator from, Operator to) throws IOException {
-            throw new UnsupportedOperationException("disconnect() can not be called on PatternMatchOperatorPlan");
-        }
-
-        public List<Operator> getLeaves() {
-            return leaves;
-        }
-
-        public Iterator<Operator> getOperators() {
-            return operators.iterator();
-        }
-
-        public List<Operator> getPredecessors(Operator op) throws IOException {
-            List<Operator> l = parent.getPredecessors(op);
-            List<Operator> list = new ArrayList<Operator>();
-            for(Operator oper: l) {
-                if (operators.contains(oper)) {
-                    list.add(oper);
-                }
-            }
-            
-            return list;
-        }
-
-        public List<Operator> getRoots() {
-            return roots;
-        }
-
-        public List<Operator> getSuccessors(Operator op) throws IOException {
-            List<Operator> l = parent.getSuccessors(op);
-            List<Operator> list = new ArrayList<Operator>();
-            for(Operator oper: l) {
-                if (operators.contains(oper)) {
-                    list.add(oper);
-                }
-            }
-            
-            return list;
-        }
-
-        public void remove(Operator op) throws IOException {
-            throw new UnsupportedOperationException("remove() can not be called on PatternMatchOperatorPlan");
-        }
-
-        public int size() {
-            return operators.size();
-        }
-        
+        }    
     }
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/Transformer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/Transformer.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/Transformer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/Transformer.java Thu Feb 18 22:20:07 2010
@@ -43,5 +43,14 @@
      * @throws IOException
      */
     public abstract void transform(OperatorPlan matched) throws IOException;
+    
+    /**
+     * Report what parts of the tree were transformed.  This is so that 
+     * listeners can know which part of the tree to visit and modify
+     * schemas, annotations, etc.  So any nodes that were removed need
+     * will not be in this plan, only nodes that were added or moved.
+     * @return OperatorPlan that describes just the changed nodes.
+     */
+    public abstract OperatorPlan reportChanges();
 
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileSpec.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileSpec.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileSpec.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileSpec.java Thu Feb 18 22:20:07 2010
@@ -20,7 +20,6 @@
 import java.io.Serializable;
 
 import org.apache.pig.FuncSpec;
-import org.apache.pig.impl.PigContext;
 
 
 /**
@@ -59,4 +58,20 @@
     public int getSize() {
         throw new UnsupportedOperationException("File Size not implemented yet");
     }
+    
+    @Override
+    public boolean equals(Object other) {
+        if (other != null && other instanceof FileSpec) {
+            FileSpec ofs = (FileSpec)other;
+            if (!fileName.equals(ofs.fileName)) return false;
+            return funcSpec.equals(ofs.funcSpec);
+        } else {
+            return false;
+        }
+    }
+    
+    @Override
+    public int hashCode() {
+        return getFuncName().hashCode() + fileName.hashCode();
+    }
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/BinaryExpressionOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/BinaryExpressionOperator.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/BinaryExpressionOperator.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/BinaryExpressionOperator.java Thu Feb 18 22:20:07 2010
@@ -20,11 +20,10 @@
 
 import java.util.List;
 
-import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+//import org.apache.commons.logging.Log;
+//import org.apache.commons.logging.LogFactory;
 
 /**
  * This abstract class represents the logical Binary Expression Operator
@@ -35,7 +34,7 @@
 
 public abstract class BinaryExpressionOperator extends ExpressionOperator {
     private static final long serialVersionUID = 2L;
-    private static Log log = LogFactory.getLog(BinaryExpressionOperator.class);
+    // private static Log log = LogFactory.getLog(BinaryExpressionOperator.class);
 
     /**
      * @param plan

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java Thu Feb 18 22:20:07 2010
@@ -40,7 +40,11 @@
 import org.apache.pig.impl.logicalLayer.LOJoin;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.LogicalPlanCloner;
 import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.optimizer.OptimizerException;
 
 /**
@@ -138,7 +142,7 @@
             // Limit cannot be pushed up
             if (predecessor instanceof LOCogroup || predecessor instanceof LOFilter ||
             		predecessor instanceof LOLoad || predecessor instanceof LOSplit ||
-            		predecessor instanceof LOSplitOutput || predecessor instanceof LODistinct || predecessor instanceof LOJoin)
+            		predecessor instanceof LODistinct || predecessor instanceof LOJoin)
             {
             	return;
             }
@@ -234,6 +238,48 @@
             		throw new OptimizerException(msg, errCode, PigException.BUG, e);
             	}
             }
+            // Limit and OrderBy (LOSort) can be separated by split
+            else if (predecessor instanceof LOSplitOutput) {               
+                if(mode == ExecType.LOCAL) {
+                    //We don't need this optimisation to happen in the local mode.
+                    //so we do nothing here.
+                } else {
+                    List<LogicalOperator> grandparants = mPlan
+                            .getPredecessors(predecessor);
+                    // After insertion of splitters, any node in the plan can 
+                    // have at most one predecessor
+                    if (grandparants != null && grandparants.size() != 0
+                            && grandparants.get(0) instanceof LOSplit) {                        
+                        List<LogicalOperator> greatGrandparants = mPlan
+                                .getPredecessors(grandparants.get(0));
+                        if (greatGrandparants != null
+                                && greatGrandparants.size() != 0
+                                && greatGrandparants.get(0) instanceof LOSort) {                           
+                            LOSort sort = (LOSort)greatGrandparants.get(0);
+                            LOSort newSort = new LOSort(
+                                    sort.getPlan(),
+                                    new OperatorKey(
+                                            sort.getOperatorKey().scope,
+                                            NodeIdGenerator
+                                                    .getGenerator()
+                                                    .getNextNodeId(
+                                                            sort.getOperatorKey().scope)),
+                                    sort.getSortColPlans(), 
+                                    sort.getAscendingCols(), 
+                                    sort.getUserFunc()); 
+                                                  
+                            newSort.setLimit(limit.getLimit());
+                            try {
+                                mPlan.replace(limit, newSort);
+                            } catch (PlanException e) {
+                                int errCode = 2012;
+                                String msg = "Can not replace LOLimit with LOSort after splitter";
+                                throw new OptimizerException(msg, errCode, PigException.BUG, e);
+                            }
+                        }
+                    }
+                }
+            }
             else {
                 int errCode = 2013;
                 String msg = "Moving LOLimit in front of " + predecessor.getClass().getSimpleName() + " is not implemented";

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java Thu Feb 18 22:20:07 2010
@@ -93,7 +93,7 @@
                 errCode = 4000;
                 break;
             }
-            errMsg = "Output specification is invalid: "+outLoc;
+            errMsg = "Output specification '"+outLoc+"' is invalid or already exists";
             msgCollector.collect(errMsg, MessageType.Error) ;
             throw new PlanValidationException(errMsg, errCode, errSrc, ioe);
         } catch (InterruptedException ie) {

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCombiner.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCombiner.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCombiner.java Thu Feb 18 22:20:07 2010
@@ -39,13 +39,40 @@
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.test.utils.LogicalPlanTester;
 
 public class TestCombiner extends TestCase {
 
-    
-
     MiniCluster cluster = MiniCluster.buildCluster();
+
+    @Test
+    public void testSuccessiveUserFuncs1() throws Exception{
+        
+        LogicalPlanTester tester = new LogicalPlanTester();
+        tester.buildPlan( "a = load 'students.txt' as (c1,c2,c3,c4); ");
+        tester.buildPlan("c = group a by c2; ");
+        tester.buildPlan("f = foreach c generate COUNT(org.apache.pig.builtin.Distinct($1.$2)); ");
+        LogicalPlan lp = tester.buildPlan("store f into 'out';");
+        PigContext pc = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()).getPigContext();
+        assertTrue((Util.buildMRPlan(Util.buildPhysicalPlan(lp,pc),pc).getRoots().get(0).combinePlan.isEmpty()));
+    }
+
+    @Test
+    public void testSuccessiveUserFuncs2() throws Exception{
+        
+        LogicalPlanTester tester = new LogicalPlanTester();
+        tester.buildPlan( "a = load 'students.txt' as (c1,c2,c3,c4); ");
+        tester.buildPlan("c = group a by c2; ");
+        String dummyUDF = JiraPig1030.class.getName();
+        tester.buildPlan("f = foreach c generate COUNT("+dummyUDF+"" +
+        		"(org.apache.pig.builtin.Distinct($1.$2),"+dummyUDF+"())); ");
+        LogicalPlan lp = tester.buildPlan("store f into 'out';");
+        PigContext pc = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()).getPigContext();
+        assertTrue((Util.buildMRPlan(Util.buildPhysicalPlan(lp,pc),pc).getRoots().get(0).combinePlan.isEmpty()));
+    }
     
     @Test
     public void testOnCluster() throws Exception {
@@ -117,7 +144,6 @@
         return inputFileName;
     }
     
-    
     @Test
     public void testNoCombinerUse() {
         // To simulate this, we will have two input files
@@ -373,7 +399,7 @@
             return "";
         }
     }
-    
+   
     @Test
     public void testJiraPig1030() {
         // test that combiner is NOT invoked when
@@ -416,4 +442,5 @@
             }
         }
     }
+
 }