You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by sm...@apache.org on 2009/07/02 22:56:22 UTC

svn commit: r790735 - in /hadoop/pig/trunk: src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/optimizer/ src/org/apache/pig/impl/plan/ test/org/apache/pig/test/ test/org/apache/pig/test/utils/

Author: sms
Date: Thu Jul  2 20:56:21 2009
New Revision: 790735

URL: http://svn.apache.org/viewvc?rev=790735&view=rev
Log:
PIG-697: Proposed improvements to pig's optimizer

Added:
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java
Modified:
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectFixerUpper.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushUpFilter.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPushUpFilter.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestRewire.java
    hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java?rev=790735&r1=790734&r2=790735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java Thu Jul  2 20:56:21 2009
@@ -86,16 +86,20 @@
                         
                         for (FieldSchema schema : cSchema.getFields()) {
                             ++i;
-                            if(nonDuplicates.containsKey(schema.alias))
-                                {
-                                    if(nonDuplicates.get(schema.alias)!=-1) {
-                                        nonDuplicates.remove(schema.alias);
-                                        nonDuplicates.put(schema.alias, -1);
-                                    }
+                            FieldSchema newFS = null;
+                            if(schema.alias != null) {
+                                if(nonDuplicates.containsKey(schema.alias)) {
+                                        if(nonDuplicates.get(schema.alias)!=-1) {
+                                            nonDuplicates.remove(schema.alias);
+                                            nonDuplicates.put(schema.alias, -1);
+                                        }
+                                } else {
+                                    nonDuplicates.put(schema.alias, i);
                                 }
-                            else
-                                nonDuplicates.put(schema.alias, i);
-                            FieldSchema newFS = new FieldSchema(op.getAlias()+"::"+schema.alias,schema.schema,schema.type);
+                                newFS = new FieldSchema(op.getAlias()+"::"+schema.alias,schema.schema,schema.type);
+                            } else {
+                                newFS = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+                            }
                             newFS.setParent(schema.canonicalName, op);
                             fss.add(newFS);
                         }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=790735&r1=790734&r2=790735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java Thu Jul  2 20:56:21 2009
@@ -588,43 +588,7 @@
                             }
                         }
                     } else {
-                        //innerSchema is null; check for schema type
-                        if(DataType.isSchemaType(leafFS.type)) {
-                            //flattening a null schema results in a bytearray
-                            if(mapped) {
-                                //map each flattened column to the original column
-                                if (cast != null) {
-                                    mapFields.put(outputColumn++,
-                                            new ProjectionMap.Column(
-                                                    new Pair<Integer, Integer>(0, inputColumn), true, cast.getType()
-                                            )
-                                    );
-                                } else {
-                                    mapFields.put(outputColumn++,
-                                            new ProjectionMap.Column(new Pair<Integer, Integer>(0, inputColumn))
-                                    );
-                                }
-                            } else {
-                                addedFields.add(outputColumn++);
-                            }
-                        } else {
-                        	if (cast != null) {
-                                mapFields.put(outputColumn++,
-                                        new ProjectionMap.Column(
-                                                new Pair<Integer, Integer>(0, inputColumn), true, cast.getType()
-                                        )
-                                );
-                            } else {
-                                mapFields.put(outputColumn++,
-                                        new ProjectionMap.Column(new Pair<Integer, Integer>(0, inputColumn))
-                                );
-                            }
-                        }
-                    }
-                } else {
-                    //innerSchema is null; check for schema type
-                    if(DataType.isSchemaType(leafFS.type)) {
-                        //flattening a null schema results in a bytearray
+                        //innerSchema is null
                         if(mapped) {
                             //map each flattened column to the original column
                             if (cast != null) {
@@ -641,8 +605,12 @@
                         } else {
                             addedFields.add(outputColumn++);
                         }
-                    } else {
-                    	if (cast != null) {
+                    }
+                } else {
+                    //innerSchema is null
+                    if(mapped) {
+                        //map each flattened column to the original column
+                        if (cast != null) {
                             mapFields.put(outputColumn++,
                                     new ProjectionMap.Column(
                                             new Pair<Integer, Integer>(0, inputColumn), true, cast.getType()
@@ -653,6 +621,8 @@
                                     new ProjectionMap.Column(new Pair<Integer, Integer>(0, inputColumn))
                             );
                         }
+                    } else {
+                        addedFields.add(outputColumn++);
                     }
                 }
             } else {
@@ -785,5 +755,24 @@
             }
         }
     }
+    
+    /**
+     * A helper method to check if the foreach has a flattened element
+     * 
+     * @return true if any of the expressions in the foreach has a flatten;
+     *         false otherwise
+     */
+    public Pair<Boolean, List<Integer>> hasFlatten() {
+        boolean hasFlatten = false;
+        List<Integer> flattenedColumns = new ArrayList<Integer>();
+        for (int i = 0; i < mFlatten.size(); ++i) {
+            Boolean b = mFlatten.get(i);
+            if (b.equals(true)) {
+                hasFlatten = true;
+                flattenedColumns.add(i);
+            }
+        }
+        return new Pair<Boolean, List<Integer>>(hasFlatten, flattenedColumns);
+    }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectFixerUpper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectFixerUpper.java?rev=790735&r1=790734&r2=790735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectFixerUpper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectFixerUpper.java Thu Jul  2 20:56:21 2009
@@ -133,7 +133,7 @@
 
                         if (oldNodeMap == null) {
                             // bail out if the projection map is null
-                            int errCode = 2146;
+                            int errCode = 2156;
                             String msg = "Error while fixing projections. Projection map of node to be replaced is null.";
                             throw new VisitorException(msg, errCode,
                                     PigException.BUG);
@@ -148,7 +148,7 @@
                                 .getMappedFields();
                         if (oldNodeMappedFields == null) {
                             // there is no mapping available bail out
-                            int errCode = 2147;
+                            int errCode = 2157;
                             String msg = "Error while fixing projections. No mapping available in old predecessor to replace column.";
                             throw new VisitorException(msg, errCode,
                                     PigException.BUG);
@@ -160,7 +160,7 @@
                         if (columns == null) {
                             // there is no mapping for oldNodeColumn
                             // it could be an added field; bail out
-                            int errCode = 2148;
+                            int errCode = 2158;
                             String msg = "Error during fixing projections. No mapping available in old predecessor for column to be replaced.";
                             throw new VisitorException(msg, errCode,
                                     PigException.BUG);
@@ -179,7 +179,7 @@
                         }
                         if (!foundMapping) {
                             // did not find a mapping - bail out
-                            int errCode = 2149;
+                            int errCode = 2159;
                             String msg = "Error during fixing projections. Could not locate replacement column from the old predecessor.";
                             throw new VisitorException(msg, errCode,
                                     PigException.BUG);
@@ -199,7 +199,7 @@
                         ProjectionMap newNodeMap = mNewNode.getProjectionMap();
                         if (newNodeMap == null) {
                             // did not find a mapping - bail out
-                            int errCode = 2150;
+                            int errCode = 2160;
                             String msg = "Error during fixing projections. Projection map of new predecessor is null.";
                             throw new VisitorException(msg, errCode,
                                     PigException.BUG);
@@ -214,7 +214,7 @@
                                 .getMappedFields();
                         if (newNodeMappedFields == null) {
                             // there is no mapping available bail out
-                            int errCode = 2151;
+                            int errCode = 2161;
                             String msg = "Error during fixing projections. No mapping available in new predecessor to replace column.";
                             throw new VisitorException(msg, errCode,
                                     PigException.BUG);
@@ -228,7 +228,7 @@
                                     .get(key);
                             if (columns == null) {
                                 // should not happen
-                                int errCode = 2152;
+                                int errCode = 2162;
                                 String msg = "Error during fixing projections. Could not locate mapping for column: "
                                         + key + " in new predecessor.";
                                 throw new VisitorException(msg, errCode,
@@ -255,8 +255,10 @@
                         }
                         if (!foundMapping) {
                             // did not find a mapping - bail out
-                            int errCode = 2153;
-                            String msg = "Error during fixing projections. Could not locate replacement column in the new predecessor.";
+                            int errCode = 2163;
+                            String msg = "Error during fixing projections. Could not locate replacement column for column: "
+                                    + oldNodeColumn
+                                    + " in the new predecessor.";
                             throw new VisitorException(msg, errCode,
                                     PigException.BUG);
                         }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=790735&r1=790734&r2=790735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java Thu Jul  2 20:56:21 2009
@@ -24,6 +24,7 @@
 import org.apache.pig.PigException;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.LOFilter;
+import org.apache.pig.impl.logicalLayer.LOForEach;
 import org.apache.pig.impl.logicalLayer.LOLimit;
 import org.apache.pig.impl.logicalLayer.LOLoad;
 import org.apache.pig.impl.logicalLayer.LOPrinter;
@@ -138,6 +139,15 @@
             rule = new Rule<LogicalOperator, LogicalPlan>(rulePlan,
                     new PushUpFilter(plan), "PushUpFilter");
             checkAndAddRule(rule);
+            
+            // Push foreach with flatten down wherever possible
+            rulePlan = new RulePlan();
+            RuleOperator loForeach = new RuleOperator(LOForEach.class,
+                    new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
+            rulePlan.add(loForeach);
+            rule = new Rule<LogicalOperator, LogicalPlan>(rulePlan,
+                    new PushDownForeachFlatten(plan), "PushDownForeachFlatten");
+            checkAndAddRule(rule);
         }
         
     }

Added: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java?rev=790735&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java Thu Jul  2 20:56:21 2009
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.logicalLayer.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.impl.logicalLayer.CastFinder;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOCross;
+import org.apache.pig.impl.logicalLayer.LOFRJoin;
+import org.apache.pig.impl.logicalLayer.LOForEach;
+import org.apache.pig.impl.logicalLayer.LOProject;
+import org.apache.pig.impl.logicalLayer.LOSort;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.UDFFinder;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ProjectionMap;
+import org.apache.pig.impl.plan.RequiredFields;
+import org.apache.pig.impl.plan.OperatorPlan.IndexHelper;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.PigException;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * A visitor to discover if a foreach with flatten(s) can be pushed as low down the tree as
+ * possible.
+ */
+public class PushDownForeachFlatten extends LogicalTransformer {
+
+    // boolean to remember if the foreach has to be swapped
+    private boolean mSwap = false;
+
+    // boolean to remember if the foreach has to be cloned and pushed into one
+    // of the foreach's successor's outputs
+    private boolean mInsertBetween = false;
+    
+    // map of flattened column to its new position in the output
+    Map<Integer, Integer> mFlattenedColumnReMap = null;
+
+    public PushDownForeachFlatten(LogicalPlan plan) {
+        super(plan, new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan));
+    }
+
+    /**
+     * 
+     * @return true if the foreach has to swapped; false otherwise
+     */
+    public boolean getSwap() {
+        return mSwap;
+    }
+
+    /**
+     * 
+     * @return true if the foreach has to be inserted after its successor; false
+     *         otherwise
+     */
+    public boolean getInsertBetween() {
+        return mInsertBetween;
+    }
+    
+    /**
+     * 
+     * @return a map of old column position in the foreach to the column
+     *         position in foreach's successor
+     */
+    public Map<Integer, Integer> getFlattenedColumnMap() {
+        return mFlattenedColumnReMap;
+    }
+
+    @Override
+    public boolean check(List<LogicalOperator> nodes) throws OptimizerException {
+        try {
+            LOForEach foreach = (LOForEach) getOperator(nodes);
+            
+            Pair<Boolean, List<Integer>> flattenResult = foreach.hasFlatten();
+            boolean flattened = flattenResult.first;
+            List<Integer> flattenedColumns = flattenResult.second;
+            Set<Integer> flattenedColumnSet = (flattenedColumns == null? null: new HashSet<Integer>(flattenedColumns));
+
+            if(!flattened) {
+                return false;
+            }
+            
+            if(flattenedColumns == null || flattenedColumns.size() == 0) {
+                return false;
+            }
+            
+            ProjectionMap foreachProjectionMap = foreach.getProjectionMap();
+            
+            if(foreachProjectionMap == null) {
+                return false;
+            }
+            
+            List<Integer> foreachAddedFields = foreachProjectionMap.getAddedFields();
+            if(foreachAddedFields != null) {
+                Set<Integer> foreachAddedFieldsSet = new HashSet<Integer>(foreachAddedFields);
+                flattenedColumnSet.removeAll(foreachAddedFieldsSet);
+            }
+            
+            if(flattenedColumnSet.size() == 0) {
+                return false;
+            }
+            
+            for(LogicalPlan foreachPlan: foreach.getForEachPlans()) {
+                UDFFinder udfFinder = new UDFFinder(foreachPlan);
+                udfFinder.visit();
+    
+                // if any of the foreach's inner plans contain a UDF then return false
+                if (udfFinder.foundAnyUDF()) {
+                    return false;
+                }
+                
+                CastFinder castFinder = new CastFinder(foreachPlan);
+                castFinder.visit();
+
+                // TODO
+                // if any of the foreach's inner plans contain a cast then return false
+                // in the future the cast should be moved appropriately
+                if (castFinder.foundAnyCast()) {
+                    return false;
+                }
+            }
+
+            List<LogicalOperator> successors = (mPlan.getSuccessors(foreach) == null ? null
+                    : new ArrayList<LogicalOperator>(mPlan
+                            .getSuccessors(foreach)));
+
+            // if the foreach has no successors or more than one successor
+            // return false
+            if (successors == null || successors.size() == 0 || successors.size() > 1) {
+                return false;
+            }
+
+            LogicalOperator successor = successors.get(0);
+
+            List<LogicalOperator> peers = (mPlan.getPredecessors(successor) == null ? null
+                    : new ArrayList<LogicalOperator>(mPlan.getPredecessors(successor)));
+            
+            // check if any of the foreach's peers is a foreach flatten
+            // if so then this rule does not apply
+            for(LogicalOperator peer: peers) {
+                if(!peer.equals(foreach)) {
+                    if(peer instanceof LOForEach) {
+                        LOForEach peerForeach = (LOForEach)peer;
+                        if(peerForeach.hasFlatten().first) {
+                            return false;
+                        }
+                    }
+                }
+            }
+            
+            IndexHelper indexHelper = new IndexHelper(peers);
+            Integer foreachPosition = indexHelper.getIndex(foreach);
+            
+            // the foreach with flatten can be swapped with an order by
+            // as the order by will have lesser number of records to sort
+            // also the sort does not alter the records that are processed
+            
+            // the foreach with flatten can be pushed down a cross or a join
+            // for the same reason. In this case the foreach has to be first
+            // unflattened and then a new foreach has to be inserted after
+            // the cross or join. In both cross and foreach the actual columns
+            // from the foreach are not altered but positions might be changed
+            
+            // in the case of union the column is transformed and as a result
+            // the foreach flatten cannot be pushed down
+            
+            // for distinct the output before flattening and the output
+            // after flattening might be different. For example, consider
+            // {(1), (1)}. Distinct of this bag is still {(1), (1)}.
+            // distinct(flatten({(1), (1)})) is (1). However,
+            // flatten(distinct({(1), (1)})) is (1), (1)
+            
+            // in both cases correctness is not affected
+            if(successor instanceof LOSort) {
+                LOSort sort = (LOSort) successor;
+                RequiredFields sortRequiredField = sort.getRequiredFields().get(0);
+                
+                if(sortRequiredField.getNeedAllFields()) {
+                    return false;
+                }
+                
+                List<Pair<Integer, Integer>> sortInputs = sortRequiredField.getFields();
+                Set<Integer> requiredInputs = new HashSet<Integer>(); 
+                for(Pair<Integer, Integer> pair: sortInputs) {
+                    requiredInputs.add(pair.second);
+                }
+                
+                requiredInputs.retainAll(flattenedColumnSet);
+                // the intersection of the sort's required inputs
+                // and the flattened columns in the foreach should
+                // be null, i.e., the size of required inputs == 0
+                if(requiredInputs.size() != 0) {
+                    return false;
+                }
+                
+                mSwap = true;
+                return true;
+            } else if (successor instanceof LOCross
+                    || successor instanceof LOFRJoin) {
+                
+                List<LogicalOperator> children = mPlan.getSuccessors(successor);
+                
+                if(children == null || children.size() > 1) {
+                    return false;
+                }
+                
+                ProjectionMap succProjectionMap = successor.getProjectionMap();
+                
+                if(succProjectionMap == null) {
+                    return false;
+                }
+                
+                MultiMap<Integer, ProjectionMap.Column> mappedFields = succProjectionMap.getMappedFields();
+                
+                if(mappedFields == null) {
+                    return false;
+                }
+
+                if(mFlattenedColumnReMap == null) {
+                    mFlattenedColumnReMap = new HashMap<Integer, Integer>();
+                }
+
+                // initialize the map
+                for(Integer key: flattenedColumnSet) {
+                    mFlattenedColumnReMap.put(key, Integer.MAX_VALUE);
+                }
+                
+                // for each output column find the corresponding input that matches the foreach's position
+                // for each input column in the foreach check if the output column is a mapping of the flattened column
+                // due to flattenning multiple output columns could be generated from the same input column
+                // find the first or the lowest column that is a result of the 
+                for(Integer key: mappedFields.keySet()) {
+                    List<ProjectionMap.Column> columns = (List<ProjectionMap.Column>)mappedFields.get(key);
+                    for(ProjectionMap.Column column: columns) {
+                        Pair<Integer, Integer> inputColumn = column.getInputColumn();
+                        
+                        // check if the input column number is the same as the
+                        // position of foreach in the list of predecessors
+                        if(foreachPosition.equals(inputColumn.first)) {
+                            if(flattenedColumnSet.contains(inputColumn.second)) {
+                                // check if the output column, i.e., key is the
+                                // least column number seen till date
+                                if(key < mFlattenedColumnReMap.get(inputColumn.second)) {
+                                    mFlattenedColumnReMap.put(inputColumn.second, key);
+                                }
+                            }
+                        }
+                    }
+                }
+                
+                // check if any of the flattened columns is not remapped
+                for(Integer key: mFlattenedColumnReMap.keySet()) {
+                    if(mFlattenedColumnReMap.get(key).equals(Integer.MAX_VALUE)) {
+                        return false;
+                    }
+                }
+                
+                mInsertBetween = true;
+                return true;
+            }
+            
+            return false;
+
+        } catch (OptimizerException oe) {
+            throw oe;
+        } catch (Exception e) {
+            int errCode = 2152;
+            String msg = "Internal error while trying to check if foreach with flatten can be pushed down.";
+            throw new OptimizerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    private LogicalOperator getOperator(List<LogicalOperator> nodes)
+            throws FrontendException {
+        if ((nodes == null) || (nodes.size() <= 0)) {
+            int errCode = 2052;
+            String msg = "Internal error. Cannot retrieve operator from null or empty list.";
+            throw new OptimizerException(msg, errCode, PigException.BUG);
+        }
+
+        LogicalOperator lo = nodes.get(0);
+        if (lo == null || !(lo instanceof LOForEach)) {
+            // we should never be called with any other operator class name
+            int errCode = 2005;
+            String msg = "Expected " + LOForEach.class.getSimpleName()
+                    + ", got "
+                    + (lo == null ? lo : lo.getClass().getSimpleName());
+            throw new OptimizerException(msg, errCode, PigException.INPUT);
+        } else {
+            return lo;
+        }
+
+    }
+
+    @Override
+    public void transform(List<LogicalOperator> nodes)
+            throws OptimizerException {
+        try {
+            LOForEach foreach = (LOForEach) getOperator(nodes);
+            LogicalOperator successor = mPlan.getSuccessors(foreach).get(0);
+            if (mSwap) {
+                mPlan.swap(successor, foreach);
+            } else if (mInsertBetween) {
+                // mark the flattened columns as not flattened in the foreach
+                // create a new foreach operator that projects each column of the
+                // successor. Mark the remapped flattened columns as flattened
+                // in the new foreach operator
+                
+                if(mFlattenedColumnReMap == null) {
+                    int errCode = 2153;
+                    String msg = "Internal error. The mapping for the flattened columns is empty";
+                    throw new OptimizerException(msg, errCode, PigException.BUG);
+                }
+                
+                // set flatten to false for all columns in the mapping
+                
+                ArrayList<Boolean> flattenList = (ArrayList<Boolean>)foreach.getFlatten();                
+                for(Integer key: mFlattenedColumnReMap.keySet()) {
+                    flattenList.set(key, false);
+                }
+                
+                // rebuild schemas of the foreach and the successor after the foreach modification
+                foreach.regenerateSchema();
+                successor.regenerateSchema();
+                
+                Schema successorSchema = successor.getSchema();
+                
+                if(successorSchema == null) {
+                    int errCode = 2154;
+                    String msg = "Internal error. Schema of successor cannot be null for pushing down foreach with flatten.";
+                    throw new OptimizerException(msg, errCode, PigException.BUG);
+                }
+                
+                flattenList = new ArrayList<Boolean>();
+                
+                ArrayList<LogicalPlan> foreachInnerPlans = new ArrayList<LogicalPlan>();
+                
+                for(int i = 0; i < successorSchema.size(); ++i) {
+                    LogicalPlan innerPlan = new LogicalPlan();
+                    LOProject project = new LOProject(innerPlan, OperatorKey
+                            .genOpKey(foreach.getOperatorKey().scope),
+                            successor, i);
+                    innerPlan.add(project);
+                    foreachInnerPlans.add(innerPlan);
+                    flattenList.add(false);
+                }
+                
+                // set the flattened remapped column to true
+                for(Integer key: mFlattenedColumnReMap.keySet()) {
+                    Integer value = mFlattenedColumnReMap.get(key);
+                    flattenList.set(value, true);
+                }            
+                
+                
+                LOForEach newForeach = new LOForEach(mPlan, OperatorKey
+                        .genOpKey(foreach.getOperatorKey().scope), foreachInnerPlans,
+                        flattenList);
+                
+                // add the new foreach to the plan
+                mPlan.add(newForeach);
+                
+                // insert the new foreach between the successor and the successor's successor
+                mPlan.insertBetween(successor, newForeach, mPlan.getSuccessors(successor).get(0));             
+            }
+        } catch (OptimizerException oe) {
+            throw oe;
+        } catch (Exception e) {
+            int errCode = 2155;
+            String msg = "Internal error while pushing foreach with flatten down.";
+            throw new OptimizerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void reset() {
+        mInsertBetween = false;
+        mSwap = false;
+        mFlattenedColumnReMap = null;
+    }
+
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushUpFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushUpFilter.java?rev=790735&r1=790734&r2=790735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushUpFilter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushUpFilter.java Thu Jul  2 20:56:21 2009
@@ -289,7 +289,7 @@
         LogicalOperator lo = nodes.get(0);
         if (lo == null || !(lo instanceof LOFilter)) {
             // we should never be called with any other operator class name
-            int errCode = 1101;
+            int errCode = 2005;
             String msg = "Expected " + LOFilter.class.getSimpleName()
                     + ", got "
                     + (lo == null ? lo : lo.getClass().getSimpleName());

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=790735&r1=790734&r2=790735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java Thu Jul  2 20:56:21 2009
@@ -1381,7 +1381,7 @@
     /*
      * A helper class that computes the index of each reference in a list for a quick lookup
      */
-    class IndexHelper <E> {
+    public static class IndexHelper <E> {
         
         private Map<E, Integer> mIndex = null;
         

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java?rev=790735&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java Thu Jul  2 20:56:21 2009
@@ -0,0 +1,929 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.FilterFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.*;
+import org.apache.pig.impl.logicalLayer.optimizer.*;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.test.utils.Identity;
+import org.apache.pig.test.utils.LogicalPlanTester;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+
+import org.junit.Test;
+import org.junit.Before;
+
+/**
+ * Test the logical optimizer.
+ */
+
+public class TestPushDownForeachFlatten extends junit.framework.TestCase {
+
+    final String FILE_BASE_LOCATION = "test/org/apache/pig/test/data/DotFiles/" ;
+    static final int MAX_SIZE = 100000;
+
+    private final Log log = LogFactory.getLog(getClass());
+    LogicalPlanTester planTester = new LogicalPlanTester() ;
+    
+
+    private static final String simpleEchoStreamingCommand;
+    static {
+        if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS"))
+            simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
+        else
+            simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
+    }
+
+    
+    @Before
+    public void tearDown() {
+        planTester.reset();
+    }
+
+    /**
+     * 
+     * A simple filter UDF for testing
+     *
+     */
+    static public class MyFilterFunc extends FilterFunc {
+    	
+    	@Override
+    	public Boolean exec(Tuple input) {
+    		return false;
+    	}
+    }
+    
+    @Test
+    //Test to ensure that the right exception is thrown when the input list is empty
+    public void testErrorEmptyInput() throws Exception {
+        LogicalPlan lp = new LogicalPlan();
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+        try {
+            pushDownForeach.check(lp.getRoots());
+            fail("Exception Expected!");
+        } catch(Exception e) {
+            assertTrue(((OptimizerException)e).getErrorCode() == 2052);
+        }
+    }
+
+    @Test
+    //Test to ensure that the right exception is thrown when the input list is empty
+    public void testErrorNonForeachInput() throws Exception {
+        LogicalPlan lp = planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");;
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+        try {
+            pushDownForeach.check(lp.getRoots());
+            fail("Exception Expected!");
+        } catch(Exception e) {
+            assertTrue(((OptimizerException)e).getErrorCode() == 2005);
+        }
+    }
+    
+    @Test
+    public void testForeachNoFlatten() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        LogicalPlan lp = planTester.buildPlan("B = foreach A generate $1;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+        
+        assertTrue(!pushDownForeach.check(lp.getLeaves()));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+        
+    }
+    
+    @Test
+    public void testForeachNoSuccessors() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        LogicalPlan lp = planTester.buildPlan("B = foreach A generate flatten($1);");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+        
+        assertTrue(!pushDownForeach.check(lp.getLeaves()));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+        
+    }
+    
+    @Test
+    public void testForeachStreaming() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate flatten($1);");
+        LogicalPlan lp = planTester.buildPlan("C = stream B through `" + simpleEchoStreamingCommand + "`;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+        
+        LOLoad load = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+        
+    }
+    
+    @Test
+    public void testForeachDistinct() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate flatten($1);");
+        LogicalPlan lp = planTester.buildPlan("C = distinct B;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+        
+        LOLoad load = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+        
+    }
+    
+    @Test
+    public void testForeachForeach() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");        
+        LogicalPlan lp = planTester.buildPlan("C = foreach B generate $0;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+        
+        LOLoad load = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+    }
+    
+
+    @Test
+    public void testForeachFilter() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");        
+        LogicalPlan lp = planTester.buildPlan("C = filter B by $1 < 18;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+        
+        LOLoad load = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+        
+    }
+
+    @Test
+    public void testForeachSplitOutput() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+        LogicalPlan lp = planTester.buildPlan("split B into C if $1 < 18, D if $1 >= 18;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+        
+        LOLoad load = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+        
+    }
+
+    @Test
+    public void testForeachLimit() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+        LogicalPlan lp = planTester.buildPlan("B = limit B 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+        
+        LOLoad load = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+    }
+
+    @Test
+    public void testForeachUnion() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
+        LogicalPlan lp = planTester.buildPlan("D = union B, C;");        
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+        
+        LOLoad load = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);        
+    }
+    
+    @Test
+    public void testForeachCogroup() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
+        LogicalPlan lp = planTester.buildPlan("D = cogroup B by $0, C by $0;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+        
+        LOLoad load = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);        
+    }
+    
+    @Test
+    public void testForeachGroupBy() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+        LogicalPlan lp = planTester.buildPlan("C = group B by $0;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+        
+        LOLoad load = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+    }
+    
+    @Test
+    public void testForeachSort() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");        
+        LogicalPlan lp = planTester.buildPlan("C = order B by $0, $1;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOSort sort = (LOSort) lp.getLeaves().get(0);
+        LOForEach foreach = (LOForEach)lp.getPredecessors(sort).get(0);
+        
+        assertTrue(pushDownForeach.check(lp.getPredecessors(sort)));
+        assertTrue(pushDownForeach.getSwap() == true);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+        
+        pushDownForeach.transform(lp.getPredecessors(sort));
+        
+        assertEquals(foreach, lp.getLeaves().get(0));
+        assertEquals(sort, lp.getPredecessors(foreach).get(0));
+        
+    }
+    
+    @Test
+    public void testForeachFlattenAddedColumnSort() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
+        LogicalPlan lp = planTester.buildPlan("C = order B by $0, $1;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+        
+        LOSort sort = (LOSort) lp.getLeaves().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getPredecessors(sort)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+        
+    }
+    
+    @Test
+    public void testForeachUDFSort() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate $0, $1, " + Identity.class.getName() + "($2) ;");
+        LogicalPlan lp = planTester.buildPlan("C = order B by $0, $1;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+        
+        LOSort sort = (LOSort) lp.getLeaves().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getPredecessors(sort)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+        
+    }
+    
+    @Test
+    public void testForeachCastSort() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate (chararray)$0, $1, flatten($2);");        
+        LogicalPlan lp = planTester.buildPlan("C = order B by $0, $1;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+        
+        LOSort sort = (LOSort) lp.getLeaves().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getPredecessors(sort)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+        
+    }
+    
+    @Test
+    public void testForeachCross() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
+        planTester.buildPlan("D = cross B, C;");
+        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOLoad load = (LOLoad) lp.getRoots().get(0);
+        LOLimit limit = (LOLimit) lp.getLeaves().get(0);
+        LOCross cross = (LOCross)lp.getPredecessors(limit).get(0);
+        LOForEach foreach = (LOForEach) lp.getPredecessors(cross).get(0);
+        
+        Schema limitSchema = limit.getSchema();
+        
+        assertTrue(pushDownForeach.check(lp.getSuccessors(load)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == true);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() != null);
+
+        pushDownForeach.transform(lp.getSuccessors(load));
+        
+        planTester.rebuildSchema(lp);
+        
+        for(Boolean b: foreach.getFlatten()) {
+            assertEquals(b.booleanValue(), false);
+        }
+        
+        LOForEach newForeach = (LOForEach)lp.getSuccessors(cross).get(0);
+        
+        
+        List<Boolean> newForeachFlatten = newForeach.getFlatten();
+        Map<Integer, Integer> remap = pushDownForeach.getFlattenedColumnMap();        
+        for(Integer key: remap.keySet()) {
+            Integer value = remap.get(key);
+            assertEquals(newForeachFlatten.get(value).booleanValue(), true);
+        }
+        
+        assertTrue(Schema.equals(limitSchema, limit.getSchema(), false, true));        
+        
+    }
+
+    @Test
+    public void testForeachCross1() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+        planTester.buildPlan("B = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+        planTester.buildPlan("C = foreach B generate $0, $1, flatten($2);");
+        planTester.buildPlan("D = cross A, C;");
+        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOLoad load = (LOLoad) lp.getRoots().get(1);
+        LOLimit limit = (LOLimit) lp.getLeaves().get(0);
+        LOCross cross = (LOCross)lp.getPredecessors(limit).get(0);
+        LOForEach foreach = (LOForEach) lp.getPredecessors(cross).get(1);
+        
+        Schema limitSchema = limit.getSchema();
+        
+        assertTrue(pushDownForeach.check(lp.getSuccessors(load)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == true);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() != null);
+
+        pushDownForeach.transform(lp.getSuccessors(load));
+        
+        planTester.rebuildSchema(lp);
+        
+        for(Boolean b: foreach.getFlatten()) {
+            assertEquals(b.booleanValue(), false);
+        }
+        
+        LOForEach newForeach = (LOForEach)lp.getSuccessors(cross).get(0);
+        
+        
+        List<Boolean> newForeachFlatten = newForeach.getFlatten();
+        Map<Integer, Integer> remap = pushDownForeach.getFlattenedColumnMap();        
+        for(Integer key: remap.keySet()) {
+            Integer value = remap.get(key);
+            assertEquals(newForeachFlatten.get(value).booleanValue(), true);
+        }
+        
+        assertTrue(Schema.equals(limitSchema, limit.getSchema(), false, true));       
+        
+    }
+
+    // TODO
+    // The following test case testForeachCross2 has multiple foreach flatten
+    // A new rule should optimize this case
+    @Test
+    public void testForeachCross2() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+        planTester.buildPlan("D = foreach C generate $0, $1, flatten($2);");
+        planTester.buildPlan("E = cross B, D;");
+        LogicalPlan lp = planTester.buildPlan("F = limit E 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOLoad loada = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+    }
+    
+    @Test
+    public void testForeachFlattenAddedColumnCross() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+        planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
+        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+        planTester.buildPlan("D = cross B, C;");
+        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOLoad loada = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+    }
+
+    @Test
+    public void testForeachUDFCross() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+        planTester.buildPlan("B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;");
+        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+        planTester.buildPlan("D = cross B, C;");
+        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOLoad loada = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+    }
+
+    @Test
+    public void testForeachCastCross() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+        planTester.buildPlan("B = foreach A generate $0, (int)$1, $2;");
+        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+        planTester.buildPlan("D = cross B, C;");
+        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOLoad loada = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+    }
+    
+    @Test
+    public void testForeachFRJoin() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
+        planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
+        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOLoad load = (LOLoad) lp.getRoots().get(0);
+        LOLimit limit = (LOLimit) lp.getLeaves().get(0);
+        LOFRJoin frjoin = (LOFRJoin)lp.getPredecessors(limit).get(0);
+        LOForEach foreach = (LOForEach) lp.getPredecessors(frjoin).get(0);
+        
+        Schema limitSchema = limit.getSchema();
+        
+        assertTrue(pushDownForeach.check(lp.getSuccessors(load)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == true);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() != null);
+
+        pushDownForeach.transform(lp.getSuccessors(load));
+        
+        planTester.rebuildSchema(lp);
+        
+        for(Boolean b: foreach.getFlatten()) {
+            assertEquals(b.booleanValue(), false);
+        }
+        
+        LOForEach newForeach = (LOForEach)lp.getSuccessors(frjoin).get(0);
+        
+        
+        List<Boolean> newForeachFlatten = newForeach.getFlatten();
+        Map<Integer, Integer> remap = pushDownForeach.getFlattenedColumnMap();        
+        for(Integer key: remap.keySet()) {
+            Integer value = remap.get(key);
+            assertEquals(newForeachFlatten.get(value).booleanValue(), true);
+        }
+        
+        assertTrue(Schema.equals(limitSchema, limit.getSchema(), false, true));        
+
+    }
+    
+
+    @Test
+    public void testForeachFRJoin1() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+        planTester.buildPlan("B = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+        planTester.buildPlan("C = foreach B generate $0, $1, flatten($2);");
+        planTester.buildPlan("D = join A by $0, C by $0 using \"replicated\";");
+        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOLoad load = (LOLoad) lp.getRoots().get(1);
+        LOLimit limit = (LOLimit) lp.getLeaves().get(0);
+        LOFRJoin frjoin = (LOFRJoin)lp.getPredecessors(limit).get(0);
+        LOForEach foreach = (LOForEach) lp.getPredecessors(frjoin).get(1);
+        
+        Schema limitSchema = limit.getSchema();
+        
+        assertTrue(pushDownForeach.check(lp.getSuccessors(load)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == true);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() != null);
+
+        pushDownForeach.transform(lp.getSuccessors(load));
+        
+        planTester.rebuildSchema(lp);
+        
+        for(Boolean b: foreach.getFlatten()) {
+            assertEquals(b.booleanValue(), false);
+        }
+        
+        LOForEach newForeach = (LOForEach)lp.getSuccessors(frjoin).get(0);
+        
+        
+        List<Boolean> newForeachFlatten = newForeach.getFlatten();
+        Map<Integer, Integer> remap = pushDownForeach.getFlattenedColumnMap();        
+        for(Integer key: remap.keySet()) {
+            Integer value = remap.get(key);
+            assertEquals(newForeachFlatten.get(value).booleanValue(), true);
+        }
+        
+        assertTrue(Schema.equals(limitSchema, limit.getSchema(), false, true));       
+
+    }
+
+    // TODO
+    // The following test case testForeachFRJoin2 has multiple foreach flatten
+    // A new rule should optimize this case
+    @Test
+    public void testForeachFRJoin2() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+        planTester.buildPlan("D = foreach C generate $0, $1, flatten($2);");
+        planTester.buildPlan("E = join B by $0, D by $0 using \"replicated\";");
+        LogicalPlan lp = planTester.buildPlan("F = limit E 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOLoad loada = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+    }
+    
+    @Test
+    public void testForeachFlattenAddedColumnFRJoin() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+        planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
+        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+        planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
+        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOLoad loada = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+    }
+
+    @Test
+    public void testForeachUDFFRJoin() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+        planTester.buildPlan("B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;");
+        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+        planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
+        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOLoad loada = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+    }
+
+    @Test
+    public void testForeachCastFRJoin() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+        planTester.buildPlan("B = foreach A generate $0, (int)$1, flatten($2);");
+        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+        planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
+        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOLoad loada = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+    }
+
+    @Test
+    public void testForeachInnerJoin() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+        planTester.buildPlan("D = join B by $0, C by $0;");
+        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOLoad loada = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+    }
+    
+    @Test
+    public void testForeachInnerJoin1() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+        planTester.buildPlan("B = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+        planTester.buildPlan("C = foreach B generate $0, $1, flatten($2);");
+        planTester.buildPlan("D = join A by $0, C by $0;");
+        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOLoad loadb = (LOLoad) lp.getRoots().get(1);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(loadb)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+    }
+
+
+    // TODO
+    // The following test case testForeachInnerJoin2 has multiple foreach flatten
+    // A new rule should optimize this case
+    @Test
+    public void testForeachInnerJoin2() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+        planTester.buildPlan("D = foreach C generate $0, $1, flatten($2);");
+        planTester.buildPlan("E = join B by $0, D by $0;");
+        LogicalPlan lp = planTester.buildPlan("F = limit E 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOLoad loada = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+    }
+    
+    @Test
+    public void testForeachFlattenAddedColumnInnerJoin() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+        planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
+        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+        planTester.buildPlan("D = join B by $0, C by $0;");
+        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOLoad loada = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+    }
+
+    @Test
+    public void testForeachUDFInnerJoin() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+        planTester.buildPlan("B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;");
+        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+        planTester.buildPlan("D = join B by $0, C by $0;");
+        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOLoad loada = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+    }
+
+    @Test
+    public void testForeachCastInnerJoin() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+        planTester.buildPlan("B = foreach A generate $0, (int)$1, flatten($2);");
+        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+        planTester.buildPlan("D = join B by $0, C by $0;");
+        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOLoad loada = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+    }
+
+}
+

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPushUpFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPushUpFilter.java?rev=790735&r1=790734&r2=790735&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPushUpFilter.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPushUpFilter.java Thu Jul  2 20:56:21 2009
@@ -88,7 +88,7 @@
             pushUpFilter.check(lp.getRoots());
             fail("Exception Expected!");
         } catch(Exception e) {
-            assertTrue(((OptimizerException)e).getErrorCode() == 1101);
+            assertTrue(((OptimizerException)e).getErrorCode() == 2005);
         }
     }
     

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestRewire.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestRewire.java?rev=790735&r1=790734&r2=790735&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestRewire.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestRewire.java Thu Jul  2 20:56:21 2009
@@ -366,7 +366,7 @@
             fail("Expected failure.");
         } catch (Exception e) {
             PigException pe = LogUtils.getPigException(e);
-            assertTrue(pe.getErrorCode() == 2146);
+            assertTrue(pe.getErrorCode() == 2156);
         }        
     }
     
@@ -392,7 +392,7 @@
             fail("Expected failure.");
         } catch(Exception e) {
             PigException pe = LogUtils.getPigException(e);
-            assertTrue(pe.getErrorCode() == 2148);
+            assertTrue(pe.getErrorCode() == 2158);
         }
         
     }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java?rev=790735&r1=790734&r2=790735&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java Thu Jul  2 20:56:21 2009
@@ -21,6 +21,8 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.impl.logicalLayer.*;
 import org.apache.pig.impl.logicalLayer.optimizer.LogicalOptimizer;
+import org.apache.pig.impl.logicalLayer.optimizer.SchemaCalculator;
+import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
 import org.apache.pig.impl.logicalLayer.validators.TypeCheckingValidator;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.PlanValidationException;
@@ -247,6 +249,19 @@
         ProjectionMapCalculator pmc = new ProjectionMapCalculator(lp);
         pmc.visit();
     }
-
-
+    
+    public void rebuildProjectionMap(LogicalPlan lp) throws VisitorException {
+        ProjectionMapRemover pmr = new ProjectionMapRemover(lp);
+        pmr.visit();
+        ProjectionMapCalculator pmc = new ProjectionMapCalculator(lp);
+        pmc.visit();
+    }
+    
+    public void rebuildSchema(LogicalPlan lp) throws VisitorException {
+        SchemaRemover sr = new SchemaRemover(lp);
+        sr.visit();
+        SchemaCalculator sc = new SchemaCalculator(lp);
+        sc.visit();
+    }
+    
 }