You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/25 19:30:40 UTC

svn commit: r989238 - in /hadoop/pig/trunk: ./ src/org/apache/pig/newplan/logical/optimizer/ src/org/apache/pig/newplan/logical/rules/ test/org/apache/pig/test/ test/org/apache/pig/test/data/DotFiles/

Author: daijy
Date: Wed Aug 25 17:30:40 2010
New Revision: 989238

URL: http://svn.apache.org/viewvc?rev=989238&view=rev
Log:
PIG-1514: Migrate logical optimization rule: OpLimitOptimizer (xuefuz via daijy)

Added:
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/OptimizeLimit.java
    hadoop/pig/trunk/test/org/apache/pig/test/OptimizeLimitPlanPrinter.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java
    hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan1.dot
    hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan10.dot
    hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan2.dot
    hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan3.dot
    hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan4.dot
    hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan5.dot
    hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan6.dot
    hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan7.dot
    hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan8.dot
    hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan9.dot
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=989238&r1=989237&r2=989238&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Aug 25 17:30:40 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
 
 IMPROVEMENTS
 
+PIG-1514: Migrate logical optimization rule: OpLimitOptimizer (xuefuz via daijy)
+
 PIG-1551: Improve dynamic invokers to deal with no-arg methods and array parameters (dvryaboy)
 
 PIG-1311:  Document audience and stability for remaining interfaces (gates)

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=989238&r1=989237&r2=989238&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Wed Aug 25 17:30:40 2010
@@ -31,6 +31,7 @@ import org.apache.pig.newplan.logical.ru
 import org.apache.pig.newplan.logical.rules.ImplicitSplitInserter;
 import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
 import org.apache.pig.newplan.logical.rules.MergeFilter;
+import org.apache.pig.newplan.logical.rules.OptimizeLimit;
 import org.apache.pig.newplan.logical.rules.PushUpFilter;
 import org.apache.pig.newplan.logical.rules.SplitFilter;
 import org.apache.pig.newplan.logical.rules.StreamTypeCastInserter;
@@ -71,6 +72,15 @@ public class LogicalPlanOptimizer extend
         if (!s.isEmpty())
             ls.add(s);
         
+        // Limit Set
+        // This set of rules optimize limit
+        s = new HashSet<Rule>();
+        // Optimize limit
+        r = new OptimizeLimit("OptimizeLimit");
+        checkAndAddRule(s, r);
+        if (!s.isEmpty())
+            ls.add(s);
+        
         // Split Set
         // This set of rules does splitting of operators only.
         // It does not move operators

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/OptimizeLimit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/OptimizeLimit.java?rev=989238&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/OptimizeLimit.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/OptimizeLimit.java Wed Aug 25 17:30:40 2010
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.rules;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LODistinct;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LOUnion;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+public class OptimizeLimit extends Rule {
+
+    public OptimizeLimit(String name) {
+        super(name, false);
+    }
+
+    @Override
+    protected OperatorPlan buildPattern() {
+        LogicalPlan plan = new LogicalPlan();
+        LogicalRelationalOperator limit = new LOLimit(plan, 0);
+        plan.add(limit);
+        return plan;
+    }
+
+    @Override
+    public Transformer getNewTransformer() {
+        return new OptimizeLimitTransformer();
+    }
+
+    public class OptimizeLimitTransformer extends Transformer {
+
+        @Override
+        public boolean check(OperatorPlan matched) {
+            LOLimit limit = (LOLimit) matched.getSources().get(0);
+            // Match each foreach.
+            List<Operator> preds = currentPlan.getPredecessors(limit);
+            if (preds == null || preds.size() == 0)
+                return false;
+
+            Operator pred = preds.get(0);
+
+            // Limit cannot be pushed up
+            if (pred instanceof LOCogroup || pred instanceof LOFilter
+                    || pred instanceof LOLoad || pred instanceof LOSplit
+                    || pred instanceof LODistinct || pred instanceof LOJoin) {
+                return false;
+            }
+
+            // Limit cannot be pushed in front of ForEach if it has a flatten
+            if (pred instanceof LOForEach) {
+                LOForEach foreach = (LOForEach) pred;
+                LogicalPlan innerPlan = foreach.getInnerPlan();
+                Iterator<Operator> it = innerPlan.getOperators();
+                while (it.hasNext()) {
+                    Operator op = it.next();
+                    if (op instanceof LOGenerate) {
+                        LOGenerate gen = (LOGenerate) op;
+                        boolean[] flattenFlags = gen.getFlattenFlags();
+                        if (flattenFlags != null) {
+                            for (boolean flatten : flattenFlags) {
+                                if (flatten)
+                                    return false;
+                            }
+                        }
+                    }
+                }
+            }
+            return true;
+        }
+
+        @Override
+        public OperatorPlan reportChanges() {
+            return currentPlan;
+        }
+
+        @Override
+        public void transform(OperatorPlan matched) throws FrontendException {
+
+            LOLimit limit = (LOLimit) matched.getSources().get(0);
+
+            // Find the next foreach operator.
+            List<Operator> preds = currentPlan.getPredecessors(limit);
+            Operator pred = preds.get(0);
+
+            if (pred instanceof LOForEach) {
+                // We can safely move LOLimit up
+                // Get operator before LOFilter
+                Operator prepredecessor = currentPlan.getPredecessors(pred)
+                        .get(0);
+                Operator succ = currentPlan.getSuccessors(limit).get(0);
+                currentPlan.disconnect(prepredecessor, pred);
+                currentPlan.disconnect(pred, limit);
+                currentPlan.disconnect(limit, succ);
+                currentPlan.connect(prepredecessor, limit);
+                currentPlan.connect(limit, pred);
+                currentPlan.connect(pred, succ);
+            } else if (pred instanceof LOCross || pred instanceof LOUnion) {
+                // Limit can be duplicated, and the new instance pushed in front
+                // of an operator for the following operators
+                // (that is, if you have X->limit, you can transform that to
+                // limit->X->limit):
+                LOLimit newLimit = null;
+                List<Operator> nodesToProcess = new ArrayList<Operator>();
+                for (Operator prepredecessor : currentPlan
+                        .getPredecessors(pred))
+                    nodesToProcess.add(prepredecessor);
+                for (Operator prepredecessor : nodesToProcess) {
+                    if (prepredecessor instanceof LOLimit) {
+                        LOLimit l = (LOLimit) prepredecessor;
+                        l.setLimit(l.getLimit() < limit.getLimit() ? l
+                                .getLimit() : limit.getLimit());
+                    } else {
+                        newLimit = new LOLimit((LogicalPlan) currentPlan, limit
+                                .getLimit());
+                        currentPlan.add(newLimit);
+                        currentPlan.disconnect(prepredecessor, pred);
+                        currentPlan.connect(prepredecessor, newLimit);
+                        currentPlan.connect(newLimit, pred);
+                    }
+                }
+            } else if (pred instanceof LOSort) {
+                LOSort sort = (LOSort) pred;
+                if (sort.getLimit() == -1)
+                    sort.setLimit(limit.getLimit());
+                else
+                    sort.setLimit(sort.getLimit() < limit.getLimit() ? sort
+                            .getLimit() : limit.getLimit());
+
+                // remove the limit
+                Operator succ = currentPlan.getSuccessors(limit).get(0);
+                currentPlan.disconnect(sort, limit);
+                currentPlan.disconnect(limit, succ);
+                currentPlan.connect(sort, succ);
+                currentPlan.remove(limit);
+            } else if (pred instanceof LOLimit) {
+                // Limit is merged into another LOLimit
+                LOLimit beforeLimit = (LOLimit) pred;
+                beforeLimit
+                        .setLimit(beforeLimit.getLimit() < limit.getLimit() ? beforeLimit
+                                .getLimit()
+                                : limit.getLimit());
+                // remove the limit
+                Operator succ = currentPlan.getSuccessors(limit).get(0);
+                currentPlan.disconnect(beforeLimit, limit);
+                currentPlan.disconnect(limit, succ);
+                currentPlan.connect(beforeLimit, succ);
+                currentPlan.remove(limit);
+            } else if (pred instanceof LOSplitOutput) {
+                // Limit and OrderBy (LOSort) can be separated by split
+                List<Operator> grandparants = currentPlan.getPredecessors(pred);
+                // After insertion of splitters, any node in the plan can
+                // have at most one predecessor
+                if (grandparants != null && grandparants.size() != 0
+                        && grandparants.get(0) instanceof LOSplit) {
+                    List<Operator> greatGrandparants = currentPlan
+                            .getPredecessors(grandparants.get(0));
+                    if (greatGrandparants != null
+                            && greatGrandparants.size() != 0
+                            && greatGrandparants.get(0) instanceof LOSort) {
+                        LOSort sort = (LOSort) greatGrandparants.get(0);
+                        LOSort newSort = new LOSort(sort.getPlan(), sort
+                                .getSortColPlans(), sort.getAscendingCols(),
+                                sort.getUserFunc());
+                        newSort.setLimit(limit.getLimit());
+
+                        Operator succ = currentPlan.getSuccessors(limit).get(0);
+                        currentPlan.disconnect(pred, limit);
+                        currentPlan.disconnect(limit, succ);
+                        currentPlan.add(newSort);
+                        currentPlan.connect(pred, newSort);
+                        currentPlan.connect(newSort, succ);
+                        currentPlan.remove(limit);
+                    }
+                }
+            }
+        }
+    }
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/OptimizeLimitPlanPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/OptimizeLimitPlanPrinter.java?rev=989238&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/OptimizeLimitPlanPrinter.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/OptimizeLimitPlanPrinter.java Wed Aug 25 17:30:40 2010
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.newplan.logical.expression.AddExpression;
+import org.apache.pig.newplan.logical.expression.AndExpression;
+import org.apache.pig.newplan.logical.expression.BinCondExpression;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.DereferenceExpression;
+import org.apache.pig.newplan.logical.expression.DivideExpression;
+import org.apache.pig.newplan.logical.expression.EqualExpression;
+import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression;
+import org.apache.pig.newplan.logical.expression.GreaterThanExpression;
+import org.apache.pig.newplan.logical.expression.IsNullExpression;
+import org.apache.pig.newplan.logical.expression.LessThanEqualExpression;
+import org.apache.pig.newplan.logical.expression.LessThanExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.newplan.logical.expression.MapLookupExpression;
+import org.apache.pig.newplan.logical.expression.ModExpression;
+import org.apache.pig.newplan.logical.expression.MultiplyExpression;
+import org.apache.pig.newplan.logical.expression.NegativeExpression;
+import org.apache.pig.newplan.logical.expression.NotEqualExpression;
+import org.apache.pig.newplan.logical.expression.NotExpression;
+import org.apache.pig.newplan.logical.expression.OrExpression;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.RegexExpression;
+import org.apache.pig.newplan.logical.expression.SubtractExpression;
+import org.apache.pig.newplan.logical.expression.UserFuncExpression;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LODistinct;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOStream;
+import org.apache.pig.newplan.logical.relational.LOUnion;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.Operator;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * This class transforms LogicalPlan to its textual representation
+ *
+ */
+public class OptimizeLimitPlanPrinter extends LogicalRelationalNodesVisitor {
+    private StringBuilder sb = null ;
+    
+    public OptimizeLimitPlanPrinter(LogicalPlan plan) throws FrontendException {
+        super( plan, new DependencyOrderWalker( plan ) );
+        sb = new StringBuilder() ;
+    }
+
+    @Override
+    public void visit(LOLoad load) throws FrontendException {
+        appendEdges( load );
+    }
+
+    @Override
+    public void visit(LOJoin join) throws FrontendException {
+        appendEdges( join );
+    }
+    
+    @Override
+    public void visit(LOGenerate gen) throws FrontendException {
+        appendEdges( gen );
+    }
+    
+    @Override
+    public void visit(LOInnerLoad load) throws FrontendException {
+        appendEdges( load );
+    }
+
+    @Override
+    public void visit(LOUnion loUnion) throws FrontendException {
+        appendEdges( loUnion );
+    }
+    
+    @Override
+    public void visit(LOStream loStream) throws FrontendException {
+        appendEdges( loStream );
+    }
+    
+    @Override
+    public void visit(LOFilter filter) throws FrontendException {
+    	LogicalExpressionPlan expPlan = filter.getFilterPlan();
+    	MyLogicalExpressionVisitor visitor = new MyLogicalExpressionVisitor( expPlan );
+    	sb.append("[" + visitor.printToString() + "]" );
+    	
+    	appendEdges( filter );
+    }
+    
+    @Override
+    public void visit(LOStore store) throws FrontendException {
+        appendEdges( store );
+    }
+    
+    @Override
+    public void visit(LOForEach foreach) throws FrontendException {
+        appendOp( foreach ) ;
+        
+    	boolean hasFlatten = false;
+        LogicalPlan inner1 = foreach.getInnerPlan();
+        Iterator<Operator> it = inner1.getOperators();
+        while( it.hasNext() ) {
+            Operator op = it.next();
+            if( op instanceof LOGenerate ) {
+                LOGenerate gen = (LOGenerate)op;
+                boolean[] flattenFlags = gen.getFlattenFlags();
+                if( flattenFlags != null ) {
+                    for( boolean flatten : flattenFlags ) {
+                        if( flatten ) {
+                            hasFlatten = true;
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+
+    	if (hasFlatten)
+    		sb.append(" [hasflat=\"true\"]");
+    	else
+    		sb.append(" [hasflat=\"false\"]");
+        sb.append(";\n");
+        appendEdges(foreach);
+    }
+    
+    @Override
+    public void visit(LOCogroup loCogroup) throws FrontendException {
+    	appendEdges(loCogroup);
+    }
+    
+    @Override
+    public void visit(LOSplit loSplit) throws FrontendException {
+    	appendEdges(loSplit);
+    }
+    
+    @Override
+    public void visit(LOSplitOutput loSplitOutput) throws FrontendException {
+    }
+    
+    @Override
+    public void visit(LOSort loSort) throws FrontendException {
+        appendOp(loSort) ;
+        sb.append(" [limit=\"" + loSort.getLimit() + "\"]");
+        sb.append(";\n");
+        appendEdges(loSort);
+    }
+    
+    @Override
+    public void visit(LODistinct loDistinct) throws FrontendException {
+        appendEdges(loDistinct);
+    }
+    
+    @Override
+    public void visit(LOLimit loLimit) throws FrontendException {
+        appendOp(loLimit) ;
+        sb.append(" [limit=\""+loLimit.getLimit()+"\"]");
+        sb.append(";\n");
+        appendEdges(loLimit);
+    }
+    
+    @Override
+    public void visit(LOCross loCross) throws FrontendException {
+        appendEdges(loCross);
+    }
+    
+    private void appendOp(Operator op)  {
+        sb.append("    "+op.getClass().getSimpleName()) ;
+
+    }
+    
+    private void appendEdges(Operator op) throws FrontendException {
+        List<Operator> list = currentWalker.getPlan().getSuccessors(op) ;
+
+		if (list!=null) {
+			for(Operator tmp: list) {         
+			    sb.append("    "+op.getClass().getSimpleName()+" -> ");
+			    sb.append(tmp.getClass().getSimpleName()+";\n");
+		    }
+		}
+    }
+    
+    /***
+     * This method has to be called after the visit is totally finished only
+     * @throws IOException 
+     */
+    public String printToString() throws IOException {
+        try {
+            visit() ;
+        } catch(VisitorException vse) {
+            throw new AssertionError("Error while transforming type graph to text") ;
+        }
+        return sb.toString() ;
+    }
+}
+
+class MyLogicalExpressionVisitor extends LogicalExpressionVisitor {
+    private StringBuilder sb = null ;
+    
+    public MyLogicalExpressionVisitor(LogicalExpressionPlan plan) throws FrontendException {
+        super( plan, new DependencyOrderWalker( plan ) );
+        sb = new StringBuilder() ;
+    }
+
+    @Override
+    public void visit(AndExpression op) throws FrontendException {
+    	appendEdges( op );
+    }
+    
+    @Override
+    public void visit(OrExpression op) throws FrontendException { 
+    	appendEdges( op );
+    }
+
+    @Override
+    public void visit(EqualExpression op) throws FrontendException {
+    	appendEdges( op );
+    }
+    
+    @Override
+    public void visit(ProjectExpression op) throws FrontendException {
+    	appendEdges( op );
+    }
+    
+    @Override
+    public void visit(ConstantExpression op) throws FrontendException {
+    	appendEdges( op );
+    }
+    
+    @Override
+    public void visit(CastExpression op) throws FrontendException {
+    	appendEdges( op );
+    }
+
+    @Override
+    public void visit(GreaterThanExpression op) throws FrontendException {
+    	appendEdges( op );
+    }
+    
+    @Override
+    public void visit(GreaterThanEqualExpression op) throws FrontendException {
+    	appendEdges( op );
+    }
+
+    @Override
+    public void visit(LessThanExpression op) throws FrontendException { 
+    	appendEdges( op );
+    }
+    
+    @Override
+    public void visit(LessThanEqualExpression op) throws FrontendException {
+    	appendEdges( op );
+    }
+
+    @Override
+    public void visit(NotEqualExpression op) throws FrontendException { 
+    	appendEdges( op );
+    }
+
+    @Override
+    public void visit(NotExpression op ) throws FrontendException {
+    	appendEdges( op );
+    }
+
+    @Override
+    public void visit(IsNullExpression op) throws FrontendException {
+    	appendEdges( op );
+    }
+    
+    @Override
+    public void visit(NegativeExpression op) throws FrontendException {
+    	appendEdges( op );
+    }
+    
+    @Override
+    public void visit(AddExpression op) throws FrontendException {
+    	appendEdges( op );
+    }
+    
+    @Override
+    public void visit(SubtractExpression op) throws FrontendException {
+    	appendEdges( op );
+    }
+    
+    @Override
+    public void visit(MultiplyExpression op) throws FrontendException {
+    	appendEdges( op );
+    }
+    
+    @Override
+    public void visit(ModExpression op) throws FrontendException {
+    	appendEdges( op );
+    }
+    
+    @Override
+    public void visit(DivideExpression op) throws FrontendException {
+    	appendEdges( op );
+    }
+
+    @Override
+    public void visit(MapLookupExpression op) throws FrontendException {
+    	appendEdges( op );
+    }
+
+    @Override
+    public void visit(BinCondExpression op) throws FrontendException {        
+    	appendEdges(op);
+    }
+
+    @Override
+    public void visit(UserFuncExpression op) throws FrontendException {
+    	appendEdges( op );
+    }
+
+    @Override
+    public void visit(DereferenceExpression bagDerefenceExpression) throws FrontendException {
+    	appendEdges( bagDerefenceExpression );
+    }
+
+    public void visit(RegexExpression op) throws FrontendException {
+    	appendEdges( op );
+    }
+
+    private void appendEdges(Operator op) throws FrontendException {
+        List<Operator> list = currentWalker.getPlan().getSuccessors(op) ;
+
+		if (list!=null) {
+			for(Operator tmp: list) {         
+			    sb.append("    "+op.getClass().getSimpleName()+" -> ");
+			    sb.append(tmp.getClass().getSimpleName()+";\n");
+		    }
+		}
+    }
+
+    public String printToString() throws FrontendException {
+        try {
+            visit() ;
+        } catch(VisitorException vse) {
+            throw new AssertionError("Error while transforming type graph to text") ;
+        }
+        return sb.toString() ;
+    }
+    
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java?rev=989238&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java Wed Aug 25 17:30:40 2010
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
+import org.apache.pig.newplan.logical.rules.OptimizeLimit;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.optimizer.PlanOptimizer;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.optimizer.OpLimitOptimizer;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.test.TestLogicalOptimizer.LogicalOptimizerDerivative;
+import org.apache.pig.test.utils.LogicalPlanTester;
+
+import junit.framework.Assert;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestOptimizeLimit {
+    final String FILE_BASE_LOCATION = "test/org/apache/pig/test/data/DotFiles/" ;
+    static final int MAX_SIZE = 100000;
+    PigContext pc = new PigContext( ExecType.LOCAL, new Properties() );
+  
+    LogicalPlanTester planTester = new LogicalPlanTester(pc) ;
+    
+    @BeforeClass
+    public static void setup() {
+        
+    }
+    
+    @AfterClass
+    public static void tearDown() {
+        
+    }
+    
+    void compareWithGoldenFile(LogicalPlan plan, String filename) throws Exception {
+        String actualPlan = printLimitGraph(plan);
+        System.out.println("We get:");
+        System.out.println(actualPlan);
+        
+        FileInputStream fis = new FileInputStream(filename);
+        byte[] b = new byte[MAX_SIZE];
+        int len = fis.read(b);
+        String goldenPlan = new String(b, 0, len);
+        System.out.println("Expected:");
+        System.out.println(goldenPlan);
+        
+		Assert.assertEquals(goldenPlan, actualPlan + "\n");
+    }
+
+    public static String printLimitGraph(LogicalPlan plan) throws IOException {
+    	OptimizeLimitPlanPrinter printer = new OptimizeLimitPlanPrinter(plan) ;
+        String rep = "digraph graph1 {\n";
+    	rep = rep + printer.printToString() ;
+    	rep = rep + "}";
+        return rep;
+    }
+
+    @Test
+    // Merget limit into sort
+	public void testOPLimit1Optimizer() throws Exception {
+	    planTester.buildPlan("A = load 'myfile';");
+	    planTester.buildPlan("B = order A by $0;");
+	    planTester.buildPlan("C = limit B 100;");
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "store C into 'empty';" );  
+	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+	    compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan1.dot");
+	}
+
+	@Test
+	// Merge limit into limit
+	public void testOPLimit2Optimizer() throws Exception {
+	    planTester.buildPlan("A = load 'myfile';");
+	    planTester.buildPlan("B = limit A 10;");
+	    planTester.buildPlan("C = limit B 100;");
+	    org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "store C into 'empty';" );
+	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+	    compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan2.dot");
+	}
+
+	@Test
+	// Duplicate limit with two inputs
+	public void testOPLimit3Optimizer() throws Exception {
+	    planTester.buildPlan("A = load 'myfile1';");
+	    planTester.buildPlan("B = load 'myfile2';");
+	    planTester.buildPlan("C = cross A, B;");
+	    org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("D = limit C 100;");
+	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+	    compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan3.dot");
+	}
+
+	@Test
+	// Duplicte limit with one input
+	public void testOPLimit4Optimizer() throws Exception {
+	    planTester.buildPlan("A = load 'myfile1';");
+	    planTester.buildPlan("B = group A by $0;");
+	    planTester.buildPlan("C = foreach B generate flatten(A);");
+	    org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("D = limit C 100;");
+	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+	    compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan4.dot");
+	}
+
+	@Test
+	// Move limit up
+    public void testOPLimit5Optimizer() throws Exception {
+        planTester.buildPlan("A = load 'myfile1';");
+        planTester.buildPlan("B = foreach A generate $0;");
+        planTester.buildPlan("C = limit B 100;");
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "store C into 'empty';" );
+	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+        compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan5.dot");
+    }
+	
+    @Test
+    // Multiple LOLimit
+	public void testOPLimit6Optimizer() throws Exception {
+	    planTester.buildPlan("A = load 'myfile';");
+	    planTester.buildPlan("B = limit A 50;");
+	    planTester.buildPlan("C = limit B 20;");
+	    planTester.buildPlan("D = limit C 100;");
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "store D into 'empty';" );
+	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+	    compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan6.dot");
+	}
+    
+    @Test
+    // Limit stay the same for ForEach with a flatten
+    public void testOPLimit7Optimizer() throws Exception {
+        planTester.buildPlan("A = load 'myfile1';");
+        planTester.buildPlan("B = foreach A generate flatten($0);");
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("C = limit B 100;");
+	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+        compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan7.dot");
+    }
+    
+    @Test
+    //Limit in the local mode, need to make sure limit stays after a sort
+    public void testOPLimit8Optimizer() throws Exception {
+        planTester.buildPlan("A = load 'myfile';");
+        planTester.buildPlan("B = order A by $0;");
+        planTester.buildPlan("C = limit B 10;");
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "store C into 'empty';" );
+	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+        compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan8.dot");
+        
+    }
+    
+    @Test
+    public void testOPLimit9Optimizer() throws Exception {
+        planTester.buildPlan("A = load 'myfile';");
+        planTester.buildPlan("B = order A by $0;");
+        planTester.buildPlan("C = limit B 10;");
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "store C into 'empty';" );
+	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+        compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan9.dot");
+        
+    }
+
+    @Test
+    //See bug PIG-913
+    public void testOPLimit10Optimizer() throws Exception {
+        planTester.buildPlan("A = load 'myfile' AS (s:chararray);");
+        planTester.buildPlan("B = limit A 100;");
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("C = GROUP B by $0;");
+	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+        compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan10.dot");
+    }
+
+    /**
+     * Test that {@link OpLimitOptimizer} returns false on the check if 
+     * pre-conditions for pushing limit up are not met
+     * @throws Exception
+     */
+    @Test
+    public void testOpLimitOptimizerCheck() throws Exception {
+        planTester.buildPlan("A = load 'myfile';");
+        planTester.buildPlan("B = foreach A generate $0;");
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("C = limit B 100;");
+        LogicalOptimizerDerivative optimizer = new LogicalOptimizerDerivative(plan);
+        int numIterations = optimizer.optimize();
+        Assert.assertFalse("Checking number of iterations of the optimizer [actual = "
+                + numIterations + ", expected < " + optimizer.getMaxIterations() + 
+                "]", optimizer.getMaxIterations() == numIterations);
+    
+    }
+
+    @Test
+    //Test to ensure that the right exception is thrown
+    public void testErrOpLimitOptimizer() throws Exception {
+    	org.apache.pig.impl.logicalLayer.LogicalPlan lp = new org.apache.pig.impl.logicalLayer.LogicalPlan();
+        OpLimitOptimizer olo = new OpLimitOptimizer(lp);
+        try {
+            olo.transform(lp.getRoots());
+        } catch(Exception e) {
+            Assert.assertTrue(((OptimizerException)e).getErrorCode() == 2052);
+        }
+    }
+    
+    @Test
+    //See bug PIG-995
+    //We shall throw no exception here
+    public void testOPLimit11Optimizer() throws Exception {
+    	org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("B = foreach (limit (order (load 'myfile' AS (a0, a1, a2)) by $1) 10) generate $0;");
+	    migrateAndOptimizePlan(plan);
+    }
+
+
+    public class MyPlanOptimizer extends LogicalPlanOptimizer {
+        protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
+            super( p, iterations, new HashSet<String>() );
+        }
+        
+        protected List<Set<Rule>> buildRuleSets() {            
+            List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+            
+            Set<Rule> s = null;
+            Rule r = null;
+            
+            s = new HashSet<Rule>();
+            r = new LoadTypeCastInserter( "TypeCastInserter");
+            s.add(r);
+            ls.add(s);
+            
+            s = new HashSet<Rule>();
+            r = new OptimizeLimit("OptimizeLimit");
+            s.add(r);
+            ls.add(s);
+            
+            return ls;
+        }
+    }    
+
+    private LogicalPlan migrateAndOptimizePlan(org.apache.pig.impl.logicalLayer.LogicalPlan plan) throws IOException {
+        LogicalPlan newLogicalPlan = migratePlan( plan );
+        PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
+        optimizer.optimize();
+        return newLogicalPlan;
+    }
+
+    private LogicalPlan migratePlan(org.apache.pig.impl.logicalLayer.LogicalPlan lp) throws VisitorException{
+        LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);        
+        visitor.visit();
+        org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
+        return newPlan;
+    }
+}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=989238&r1=989237&r2=989238&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java Wed Aug 25 17:30:40 2010
@@ -161,7 +161,7 @@ public class TestPigRunner {
                     0)).getAlias());
             assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
                     js).get(0)).getAlias());
-            assertEquals("B,C", js.getAlias()); 
+            assertEquals("B", js.getAlias()); 
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java?rev=989238&r1=989237&r2=989238&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java Wed Aug 25 17:30:40 2010
@@ -95,7 +95,7 @@ public class TestPigStats  {
         assertEquals("D", getAlias(mro));
          
         mro = mp.getSuccessors(mro).get(0);
-        assertEquals("D,E", getAlias(mro));
+        assertEquals("D", getAlias(mro));
     }
     
     private void deleteDirectory( File dir ) {

Added: hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan1.dot
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan1.dot?rev=989238&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan1.dot (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan1.dot Wed Aug 25 17:30:40 2010
@@ -0,0 +1,5 @@
+digraph graph1 {
+    LOLoad -> LOSort;
+    LOSort [limit="100"];
+    LOSort -> LOStore;
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan10.dot
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan10.dot?rev=989238&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan10.dot (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan10.dot Wed Aug 25 17:30:40 2010
@@ -0,0 +1,7 @@
+digraph graph1 {
+    LOLoad -> LOLimit;
+    LOLimit [limit="100"];
+    LOLimit -> LOForEach;
+    LOForEach [hasflat="false"];
+    LOForEach -> LOCogroup;
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan2.dot
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan2.dot?rev=989238&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan2.dot (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan2.dot Wed Aug 25 17:30:40 2010
@@ -0,0 +1,5 @@
+digraph graph1 {
+    LOLoad -> LOLimit;
+    LOLimit [limit="10"];
+    LOLimit -> LOStore;
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan3.dot
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan3.dot?rev=989238&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan3.dot (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan3.dot Wed Aug 25 17:30:40 2010
@@ -0,0 +1,10 @@
+digraph graph1 {
+    LOLoad -> LOLimit;
+    LOLimit [limit="100"];
+    LOLimit -> LOCross;
+    LOLoad -> LOLimit;
+    LOLimit [limit="100"];
+    LOLimit -> LOCross;
+    LOCross -> LOLimit;
+    LOLimit [limit="100"];
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan4.dot
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan4.dot?rev=989238&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan4.dot (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan4.dot Wed Aug 25 17:30:40 2010
@@ -0,0 +1,7 @@
+digraph graph1 {
+    LOLoad -> LOCogroup;
+    LOCogroup -> LOForEach;
+    LOForEach [hasflat="true"];
+    LOForEach -> LOLimit;
+    LOLimit [limit="100"];
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan5.dot
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan5.dot?rev=989238&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan5.dot (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan5.dot Wed Aug 25 17:30:40 2010
@@ -0,0 +1,7 @@
+digraph graph1 {
+    LOLoad -> LOLimit;
+    LOLimit [limit="100"];
+    LOLimit -> LOForEach;
+    LOForEach [hasflat="false"];
+    LOForEach -> LOStore;
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan6.dot
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan6.dot?rev=989238&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan6.dot (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan6.dot Wed Aug 25 17:30:40 2010
@@ -0,0 +1,5 @@
+digraph graph1 {
+    LOLoad -> LOLimit;
+    LOLimit [limit="20"];
+    LOLimit -> LOStore;
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan7.dot
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan7.dot?rev=989238&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan7.dot (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan7.dot Wed Aug 25 17:30:40 2010
@@ -0,0 +1,6 @@
+digraph graph1 {
+    LOLoad -> LOForEach;
+    LOForEach [hasflat="true"];
+    LOForEach -> LOLimit;
+    LOLimit [limit="100"];
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan8.dot
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan8.dot?rev=989238&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan8.dot (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan8.dot Wed Aug 25 17:30:40 2010
@@ -0,0 +1,5 @@
+digraph graph1 {
+    LOLoad -> LOSort;
+    LOSort [limit="10"];
+    LOSort -> LOStore;
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan9.dot
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan9.dot?rev=989238&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan9.dot (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/new-optlimitplan9.dot Wed Aug 25 17:30:40 2010
@@ -0,0 +1,5 @@
+digraph graph1 {
+    LOLoad -> LOSort;
+    LOSort [limit="10"];
+    LOSort -> LOStore;
+}