You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/28 00:23:27 UTC

svn commit: r990288 - in /hadoop/pig/trunk: ./ src/org/apache/pig/newplan/ src/org/apache/pig/newplan/logical/expression/ src/org/apache/pig/newplan/logical/optimizer/ src/org/apache/pig/newplan/logical/rules/ test/org/apache/pig/test/

Author: daijy
Date: Fri Aug 27 22:23:26 2010
New Revision: 990288

URL: http://svn.apache.org/viewvc?rev=990288&view=rev
Log:
PIG-1321: Logical Optimizer: Merge cascading foreach

Added:
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=990288&r1=990287&r2=990288&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Aug 27 22:23:26 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
 
 IMPROVEMENTS
 
+PIG-1321: Logical Optimizer: Merge cascading foreach (xuefuz via daijy)
+
 PIG-1483: [piggybank] Add HadoopJobHistoryLoader to the piggybank (rding)
 
 PIG-1555: [piggybank] add CSV Loader (dvryaboy)

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java?rev=990288&r1=990287&r2=990288&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java Fri Aug 27 22:23:26 2010
@@ -83,6 +83,10 @@ public abstract class Operator {
         return annotations.remove(key);
     }
     
+    public void setPlan(OperatorPlan p) {
+        plan = p;
+    }
+    
     /**
      * This is like a shallow equals comparison.
      * It returns true if two operators have equivalent properties even if they are 

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java?rev=990288&r1=990287&r2=990288&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java Fri Aug 27 22:23:26 2010
@@ -19,7 +19,12 @@
 package org.apache.pig.newplan.logical.expression;
 
 import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.newplan.BaseOperatorPlan;
@@ -58,4 +63,35 @@ public class LogicalExpressionPlan exten
         ExprPrinter npp = new ExprPrinter(this, ps);
         npp.visit();
     }
+    
+    /**
+     * Merge all nodes in lgExpPlan, keep the connections
+     * @param lgExpPlan plan to merge
+     * @return sources of the merged plan
+     */
+    public List<Operator> merge(LogicalExpressionPlan lgExpPlan) throws FrontendException {
+        
+        List<Operator> sources = lgExpPlan.getSources();
+        
+        Iterator<Operator> iter = lgExpPlan.getOperators();
+        while (iter.hasNext()) {
+            LogicalExpression op = (LogicalExpression)iter.next();
+            op.setPlan(this);
+            add(op);
+        }
+        
+        iter = lgExpPlan.getOperators();
+        while (iter.hasNext()) {
+            LogicalExpression startOp = (LogicalExpression)iter.next();
+            ArrayList<Operator> endOps = (ArrayList<Operator>)lgExpPlan.fromEdges.get(startOp);
+            if (endOps!=null) {
+                for (Operator endOp : endOps) {
+                        connect(startOp, endOp);
+                }
+            }
+        }
+        
+        return sources;
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=990288&r1=990287&r2=990288&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Fri Aug 27 22:23:26 2010
@@ -29,6 +29,7 @@ import org.apache.pig.newplan.logical.ru
 import org.apache.pig.newplan.logical.rules.ImplicitSplitInserter;
 import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
 import org.apache.pig.newplan.logical.rules.MergeFilter;
+import org.apache.pig.newplan.logical.rules.MergeForEach;
 import org.apache.pig.newplan.logical.rules.PushUpFilter;
 import org.apache.pig.newplan.logical.rules.SplitFilter;
 import org.apache.pig.newplan.logical.rules.StreamTypeCastInserter;
@@ -137,6 +138,14 @@ public class LogicalPlanOptimizer extend
         if (!s.isEmpty())
             ls.add(s);
         
+        // Add MergeForEach set
+        s = new HashSet<Rule>();
+        // Add the AddForEach
+        r = new MergeForEach("MergeForEach");
+        checkAndAddRule(s, r);
+        if (!s.isEmpty())
+            ls.add(s);
+        
         return ls;
     }
         

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java?rev=990288&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java Fri Aug 27 22:23:26 2010
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
+
+public class MergeForEach extends Rule {
+
+    private OperatorSubPlan subPlan;
+    
+    public MergeForEach(String name) {
+        super( name, false );
+    }
+
+    @Override
+    protected OperatorPlan buildPattern() {
+        // match each foreach.
+        LogicalPlan plan = new LogicalPlan();
+        LogicalRelationalOperator foreach1 = new LOForEach(plan);
+        plan.add( foreach1 );
+        return plan;
+    }
+
+    @Override
+    public Transformer getNewTransformer() {
+        return new MergeForEachTransformer();
+    }
+    
+    public class MergeForEachTransformer extends Transformer {
+        @Override
+        public boolean check(OperatorPlan matched) throws FrontendException {
+            LOForEach foreach1 = (LOForEach)matched.getSources().get(0);
+            List<Operator> succs = currentPlan.getSuccessors( foreach1 );
+            if( succs == null || succs.size() != 1 || !( succs.get(0) instanceof LOForEach) )
+                return false;
+            
+            LOForEach foreach2 = (LOForEach)succs.get(0);
+            
+            // Check if the second foreach has only LOGenerate and LOInnerLoad
+            Iterator<Operator> it = foreach2.getInnerPlan().getOperators();
+            while( it.hasNext() ) {
+                Operator op = it.next();
+                if(!(op instanceof LOGenerate) && !(op instanceof LOInnerLoad))
+                    return false;
+            }
+            
+            // Check if the first foreach has flatten in its generate statement.
+            LOGenerate gen1 = (LOGenerate)foreach1.getInnerPlan().getSinks().get(0);
+            for (boolean flatten : gen1.getFlattenFlags()) {
+                if( flatten )
+                    return false;
+            }
+            
+            // Check if non of the 1st foreach output is referred more than once in second foreach.
+            // Otherwise, we may do expression calculation more than once, defeat the benefit of this
+            // optimization
+            Set<Integer> inputs = new HashSet<Integer>();
+            for (Operator op : foreach2.getInnerPlan().getSources()) {
+                // If the source is not LOInnerLoad, then it must be LOGenerate. This happens when 
+                // the 1st ForEach does not rely on any input of 2nd ForEach
+                if (op instanceof LOInnerLoad) {
+                    LOInnerLoad innerLoad = (LOInnerLoad)op;
+                    int input = innerLoad.getProjection().getColNum();
+                    if (inputs.contains(input))
+                        return false;
+                    else
+                        inputs.add(input);
+                    
+                    if (innerLoad.getProjection().isProjectStar())
+                        return false;
+                }
+            }
+            
+            return true;
+        }
+
+        @Override
+        public OperatorPlan reportChanges() {
+            return subPlan;
+        }
+
+        private void addBranchToPlan(LOGenerate gen, int branch, OperatorPlan newPlan) {
+            Operator op = gen.getPlan().getPredecessors(gen).get(branch);
+            newPlan.add(op);
+            op.setPlan(newPlan);
+            Operator pred;
+            if (gen.getPlan().getPredecessors(op)!=null)
+                pred = gen.getPlan().getPredecessors(op).get(0);
+            else
+                pred = null;
+            while (pred!=null) {
+                newPlan.add(pred);
+                pred.setPlan(newPlan);
+                newPlan.connect(pred, op);
+                op = pred;
+                if (gen.getPlan().getPredecessors(pred)!=null)
+                    pred = gen.getPlan().getPredecessors(pred).get(0);
+                else
+                    pred = null;
+            }
+        }
+        
+        @Override
+        public void transform(OperatorPlan matched) throws FrontendException {
+            subPlan = new OperatorSubPlan(currentPlan);
+            
+            LOForEach foreach1 = (LOForEach)matched.getSources().get(0);
+            LOGenerate gen1 = (LOGenerate)foreach1.getInnerPlan().getSinks().get(0);
+            
+            LOForEach foreach2 = (LOForEach)currentPlan.getSuccessors(foreach1).get(0);
+            LOGenerate gen2 = (LOGenerate)foreach2.getInnerPlan().getSinks().get(0);
+            
+            LOForEach newForEach = new LOForEach(currentPlan);
+            LogicalPlan newForEachInnerPlan = new LogicalPlan();
+            newForEach.setInnerPlan(newForEachInnerPlan);
+            newForEach.setAlias(foreach2.getAlias());
+            newForEach.setRequestedParallelism(foreach1.getRequestedParallelisam());
+            List<LogicalExpressionPlan> newExpList = new ArrayList<LogicalExpressionPlan>();
+            LOGenerate newGen = new LOGenerate(newForEachInnerPlan, newExpList, gen2.getFlattenFlags());
+            newForEachInnerPlan.add(newGen);
+            
+            for (LogicalExpressionPlan exp2 : gen2.getOutputPlans()) {
+                LogicalExpressionPlan newExpPlan = new LogicalExpressionPlan();
+                newExpPlan.merge(exp2);
+                
+                // Add expression plan in 2nd ForEach
+                List<Operator> exp2Sinks = new ArrayList<Operator>();
+                exp2Sinks.addAll(newExpPlan.getSinks());
+                for (Operator exp2Sink : exp2Sinks) {
+                    if (exp2Sink instanceof ProjectExpression) {
+                        // Find referred expression plan in 1st ForEach
+                        ProjectExpression proj = (ProjectExpression)exp2Sink;
+                        LOInnerLoad innerLoad = (LOInnerLoad)foreach2.getInnerPlan().getPredecessors(gen2).get(proj.getInputNum());
+                        int exp1Pos = innerLoad.getProjection().getColNum();
+                        LogicalExpressionPlan exp1 = gen1.getOutputPlans().get(exp1Pos);
+                        List<Operator> exp1Sources = newExpPlan.merge(exp1);
+                        
+                        // Copy expression plan to the new ForEach, connect to the expression plan of 2nd ForEach
+                        Operator exp1Source = exp1Sources.get(0);
+                        if (newExpPlan.getPredecessors(exp2Sink)!=null) {
+                            Operator exp2NextToSink = newExpPlan.getPredecessors(exp2Sink).get(0);
+                            newExpPlan.disconnect(exp2NextToSink, exp2Sink);
+                            newExpPlan.remove(exp2Sink);
+                            newExpPlan.connect(exp2NextToSink, exp1Source);
+                        }
+                        else {
+                            newExpPlan.remove(exp2Sink);
+                        }
+                    }
+                }
+                
+                // Copy referred ForEach1 inner plan to new ForEach
+                List<Operator> exp1Sinks = newExpPlan.getSinks();
+                for (Operator exp1Sink : exp1Sinks) {
+                    if (exp1Sink instanceof ProjectExpression) {
+                        addBranchToPlan(gen1, ((ProjectExpression)exp1Sink).getInputNum(), newForEachInnerPlan);
+                        Operator opNextToGen = foreach1.getInnerPlan().getPredecessors(gen1).get(((ProjectExpression)exp1Sink).getInputNum());
+                        newForEachInnerPlan.connect(opNextToGen, newGen);
+                        int input = newForEachInnerPlan.getPredecessors(newGen).indexOf(opNextToGen);
+                        ((ProjectExpression)exp1Sink).setInputNum(input);
+                    }
+                }
+                
+                newExpList.add(newExpPlan);
+            }
+            
+            // Adjust attachedOp
+            for (LogicalExpressionPlan p : newGen.getOutputPlans()) {
+                Iterator<Operator> iter = p.getOperators();
+                while (iter.hasNext()) {
+                    Operator op = iter.next();
+                    if (op instanceof ProjectExpression) {
+                        ((ProjectExpression)op).setAttachedRelationalOp(newGen);
+                    }
+                }
+            }
+            
+            Iterator<Operator> iter = newForEach.getInnerPlan().getOperators();
+            while (iter.hasNext()) {
+                Operator op = iter.next();
+                if (op instanceof LOInnerLoad) {
+                    ((LOInnerLoad)op).getProjection().setAttachedRelationalOp(newForEach);
+                }
+            }
+            // remove foreach1, foreach2, add new foreach
+            Operator pred = currentPlan.getPredecessors(foreach1).get(0);
+            Operator succ = currentPlan.getSuccessors(foreach2).get(0);
+            Pair<Integer, Integer> pos = currentPlan.disconnect(pred, foreach1);
+            currentPlan.disconnect(foreach1, foreach2);
+            currentPlan.disconnect(foreach2, succ);
+            currentPlan.remove(foreach1);
+            currentPlan.remove(foreach2);
+
+            currentPlan.add(newForEach);
+            currentPlan.connect(pred, pos.first, newForEach, pos.second);
+            currentPlan.connect(newForEach, succ);
+            
+            subPlan.add(newForEach);
+        }
+    }
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java?rev=990288&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java Fri Aug 27 22:23:26 2010
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.rules.AddForEach;
+import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
+import org.apache.pig.newplan.logical.rules.MergeForEach;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.optimizer.PlanOptimizer;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.test.utils.LogicalPlanTester;
+
+import junit.framework.Assert;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMergeForEachOptimization {
+    LogicalPlan plan = null;
+    PigContext pc = new PigContext( ExecType.LOCAL, new Properties() );
+  
+    private LogicalPlan migratePlan(org.apache.pig.impl.logicalLayer.LogicalPlan lp) throws VisitorException{
+        LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);        
+        visitor.visit();
+        org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
+        return newPlan;
+    }
+    
+    @BeforeClass
+    public static void setup() {
+        
+    }
+    
+    @AfterClass
+    public static void tearDown() {
+        
+    }
+    
+    /**
+     * Basic test case. Two simple FOREACH statements can be merged to one.
+     * 
+     * @throws IOException
+     */
+    @Test   
+    public void testSimple() throws IOException  {
+        LogicalPlanTester lpt = new LogicalPlanTester( pc );
+        lpt.buildPlan( "A = load 'file.txt' as (a, b, c);" );
+        lpt.buildPlan( "B = foreach A generate a+b as u, c-b as v;" );
+        lpt.buildPlan( "C = foreach B generate $0+5, v;" );
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "store C into 'empty';" );  
+        LogicalPlan newLogicalPlan = migratePlan( plan );
+        
+        int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
+        int outputExprCount1 = getOutputExprCount( newLogicalPlan );
+        LOForEach foreach1 = getForEachOperator( newLogicalPlan );
+        Assert.assertTrue( foreach1.getAlias().equals( "C" ) );
+               
+        PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
+        optimizer.optimize();
+        
+        int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
+        Assert.assertEquals( 1, forEachCount1 - forEachCount2 );
+        int outputExprCount2 = getOutputExprCount( newLogicalPlan );
+        Assert.assertTrue( outputExprCount1 == outputExprCount2 );
+        LOForEach foreach2 = getForEachOperator( newLogicalPlan );
+        Assert.assertTrue( foreach2.getAlias().equals( "C" ) );
+    }
+    
+    /**
+     * Test more complex case where the first for each in the script has inner plan.
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void testComplex() throws IOException {
+        LogicalPlanTester lpt = new LogicalPlanTester( pc );
+        lpt.buildPlan( "A = load 'file.txt' as (a:int, b, c:bag{t:tuple(c0:int,c1:int)});" );
+        lpt.buildPlan( "B = foreach A { S = ORDER c BY $0; generate $0 as u, COUNT(S) as v, SUM(S) as w; };" );
+        lpt.buildPlan( "C = foreach B generate w+5 as x, u-v/2;" );
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "store C into 'empty';" );  
+        LogicalPlan newLogicalPlan = migratePlan( plan );
+        
+        int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
+        int outputExprCount1 = getOutputExprCount( newLogicalPlan );
+        LOForEach foreach1 = getForEachOperator( newLogicalPlan );
+        Assert.assertTrue( foreach1.getAlias().equals( "C" ) );
+               
+        PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
+        optimizer.optimize();
+        
+        int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
+        // The number of FOREACHes didn't change because one is genereated because of type cast and
+        // one is reduced because of the merge.
+        Assert.assertEquals( 0, forEachCount1 - forEachCount2 );
+        int outputExprCount2 = getOutputExprCount( newLogicalPlan );
+        Assert.assertTrue( outputExprCount1 == outputExprCount2 );
+        LOForEach foreach2 = getForEachOperator( newLogicalPlan );
+        Assert.assertTrue( foreach2.getAlias().equals( "C" ) );
+    }
+    
+    /**
+     * Not all consecutive FOREACHes can be merged. In this case, the second FOREACH statment
+     * has inner plan, which cannot be merged with one before it.
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void testNegative1() throws IOException {
+        LogicalPlanTester lpt = new LogicalPlanTester( pc );
+        lpt.buildPlan( "A = LOAD 'file.txt' as (a, b, c, d:bag{t:tuple(c0:int,c1:int)});" );
+        lpt.buildPlan( "B = FOREACH A GENERATE a+5 AS u, b-c/2 AS v, d AS w;" );
+        lpt.buildPlan( "C = FOREACH B { S = ORDER w BY $0; GENERATE $0 as x, COUNT(S) as y; };" );
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "store C into 'empty';" );  
+        LogicalPlan newLogicalPlan = migratePlan( plan );
+        
+        int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
+               
+        PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
+        optimizer.optimize();
+        int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
+        
+        // Actually MergeForEach optimization is happening here. A new foreach will be inserted after A because
+        // of typ casting. The inserted one and the one in B can be merged due to this optimization. However, 
+        // the plan cannot be further optimized because C has inner plan.
+        Assert.assertEquals( forEachCount1, forEachCount2 );
+    }
+    
+    /**
+     * MergeForEach Optimization is off if the first statement has a FLATTEN operator.
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void testNegative2() throws IOException {
+        LogicalPlanTester lpt = new LogicalPlanTester( pc );
+        lpt.buildPlan( "A = LOAD 'file.txt' as (a, b, c);" );
+        lpt.buildPlan( "B = FOREACH A GENERATE FLATTEN(a), b, c;" );
+        lpt.buildPlan( "C = FOREACH B GENERATE $0, $1+$2;" );
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "store C into 'empty';" );  
+        LogicalPlan newLogicalPlan = migratePlan( plan );
+        
+        int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
+               
+        PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
+        optimizer.optimize();
+        
+        int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
+        Assert.assertEquals( 2, forEachCount1 );
+        Assert.assertEquals( 2, forEachCount2 );
+    }
+
+    private int getForEachOperatorCount(LogicalPlan plan) {
+        Iterator<Operator> ops = plan.getOperators();
+        int count = 0;
+        while( ops.hasNext() ) {
+            Operator op = ops.next();
+            if( op instanceof LOForEach )
+                count++;
+        }
+        return count;
+    }
+       
+    private int getOutputExprCount(LogicalPlan plan) throws IOException {
+        LOForEach foreach = getForEachOperator( plan );
+        LogicalPlan inner = foreach.getInnerPlan();
+        List<Operator> ops = inner.getSinks();
+        LOGenerate gen = (LOGenerate)ops.get( 0 );
+        return gen.getOutputPlans().size();
+    }
+    
+    private LOForEach getForEachOperator(LogicalPlan plan) throws IOException {
+        Iterator<Operator> ops = plan.getOperators();
+        while( ops.hasNext() ) {
+            Operator op = ops.next();
+            if( op instanceof LOForEach ) {
+                LOForEach foreach = (LOForEach)op;
+                Operator succ = plan.getSuccessors( foreach ).get( 0 );
+                if( !(succ instanceof LOForEach ) )
+                    return foreach;
+            }
+        }
+        return null;
+    }
+
+    public class MyPlanOptimizer extends LogicalPlanOptimizer {
+        protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
+            super(p, iterations, new HashSet<String>());
+        }
+        
+        protected List<Set<Rule>> buildRuleSets() {            
+            List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+            
+            Set<Rule> s = new HashSet<Rule>();
+            // add split filter rule
+            Rule r = new LoadTypeCastInserter( "TypeCastInserter" );
+            s.add(r);
+            ls.add(s);
+             
+            // Split Set
+            // This set of rules does splitting of operators only.
+            // It does not move operators
+            s = new HashSet<Rule>();
+            r = new AddForEach( "AddForEach" );
+            s.add(r);            
+            ls.add(s);
+            
+            s = new HashSet<Rule>();
+            r = new MergeForEach("MergeForEach");
+            s.add(r);            
+            ls.add(s);
+
+            return ls;
+        }
+    }    
+}