You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/03/15 04:28:28 UTC

svn commit: r923043 [3/5] - in /hadoop/pig/trunk: src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/logical/ src/org/apache/pig/experimental/logical/expression/ src/org/apache/pig/experimental/logical/optimizer/ src/org...

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/ColumnPruneHelper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/ColumnPruneHelper.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/ColumnPruneHelper.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/ColumnPruneHelper.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,487 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.relational.LOCogroup;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOForEach;
+import org.apache.pig.experimental.logical.relational.LOGenerate;
+import org.apache.pig.experimental.logical.relational.LOInnerLoad;
+import org.apache.pig.experimental.logical.relational.LOJoin;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LOStore;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalPlanVisitor;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.logical.relational.SchemaNotDefinedException;
+import org.apache.pig.experimental.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.ReverseDependencyOrderWalker;
+
+/**
+ * Helper class used by ColumnMapKeyPrune to figure out what columns can be pruned.
+ * It doesn't make any changes to the operator plan
+ *
+ */
+public class ColumnPruneHelper {
+    protected static final String INPUTUIDS = "ColumnPrune:InputUids";
+    protected static final String OUTPUTUIDS = "ColumnPrune:OutputUids";    
+    protected static final String REQUIREDCOLS = "ColumnPrune:RequiredColumns";
+    
+    private OperatorPlan currentPlan;
+    private OperatorSubPlan subPlan;
+
+    public ColumnPruneHelper(OperatorPlan currentPlan) {
+        this.currentPlan = currentPlan;
+    }    
+    
+    private OperatorSubPlan getSubPlan() throws IOException {
+        OperatorSubPlan p = null;
+        if (currentPlan instanceof OperatorSubPlan) {
+            p = new OperatorSubPlan(((OperatorSubPlan)currentPlan).getBasePlan());
+        } else {
+            p = new OperatorSubPlan(currentPlan);
+        }
+        Iterator<Operator> iter = currentPlan.getOperators();
+        
+        while(iter.hasNext()) {
+            Operator op = iter.next();
+            if (op instanceof LOForEach) {
+                addOperator(op, p);
+            }
+        }
+        
+        return p;
+    }
+    
+    private void addOperator(Operator op, OperatorSubPlan subplan) throws IOException {
+        if (op == null) {
+            return;
+        }
+        
+        subplan.add(op);
+        
+        List<Operator> ll = currentPlan.getPredecessors(op);
+        if (ll == null) {
+            return;
+        }
+        
+        for(Operator pred: ll) {
+            addOperator(pred, subplan);
+        }
+    }
+    
+        
+    @SuppressWarnings("unchecked")
+    public boolean check() throws IOException {
+        List<Operator> sources = currentPlan.getSources();
+        // if this rule has run before, just return false
+        if (sources.get(0).getAnnotation(INPUTUIDS) != null) {
+            return false;
+        }
+        
+        // create sub-plan that ends with foreach
+        subPlan = getSubPlan();
+        if (subPlan.size() == 0) {
+            return false;
+        }
+        
+        ColumnDependencyVisitor v = new ColumnDependencyVisitor(subPlan);
+        try {
+            v.visit();
+        }catch(SchemaNotDefinedException e) {
+            // if any operator has an unknown schema, just return false
+            return false;
+        }
+        
+        List<Operator> ll = subPlan.getSources();
+        boolean found = false;
+        for(Operator op: ll) {
+            if (op instanceof LOLoad) {
+                Set<Long> uids = (Set<Long>)op.getAnnotation(INPUTUIDS);
+                LogicalSchema s = ((LOLoad) op).getSchema();
+                Set<Integer> required = getColumns(s, uids);
+                
+                if (required.size() < s.size()) {
+                    op.annotate(REQUIREDCOLS, required);              
+                    found = true;
+                }
+            }
+        }
+        
+        return found;
+    }
+
+    // get a set of column indexes from a set of uids
+    protected Set<Integer> getColumns(LogicalSchema schema, Set<Long> uids) throws IOException {
+        if (schema == null) {
+            throw new SchemaNotDefinedException("Schema is not defined.");
+        }
+        
+        Set<Integer> cols = new HashSet<Integer>();
+        Iterator<Long> iter = uids.iterator();
+        while(iter.hasNext()) {
+            long uid = iter.next();
+            int index = schema.findField(uid);
+            if (index == -1) {
+                throw new IOException("UID " + uid + " is not found in the schema");
+            }
+              
+            cols.add(index);
+        }
+          
+        return cols;
+    }
+    
+    public OperatorPlan reportChanges() {
+        return subPlan;
+    }
+   
+    // Visitor to calculate the input and output uids for each operator
+    // It doesn't change the plan, only put calculated info as annotations
+    // The input and output uids are not necessarily the top level uids of
+    // a schema. They may be the uids of lower level fields of complex fields
+    // that have their own schema.
+    private class ColumnDependencyVisitor extends LogicalPlanVisitor {    	
+        
+        public ColumnDependencyVisitor(OperatorPlan plan) {
+            super(plan, new ReverseDependencyOrderWalker(plan));            
+        }
+        
+        public void visitLOLoad(LOLoad load) throws IOException {
+            Set<Long> output = setOutputUids(load);
+            
+            // for load, input uids are same as output uids
+            load.annotate(INPUTUIDS, output);
+        }
+
+        public void visitLOFilter(LOFilter filter) throws IOException {
+            Set<Long> output = setOutputUids(filter);
+            
+            // the input uids contains all the output uids and
+            // projections in filter conditions
+            Set<Long> input = new HashSet<Long>(output);
+            
+            LogicalExpressionPlan exp = filter.getFilterPlan();
+            collectUids(filter, exp, input);
+            
+            filter.annotate(INPUTUIDS, input);
+        }
+        
+        public void visitLOStore(LOStore store) throws IOException {
+            Set<Long> output = setOutputUids(store);            
+            
+            if (output.isEmpty()) {
+                // to deal with load-store-load-store case
+                LogicalSchema s = store.getSchema();
+                if (s == null) {
+                    throw new SchemaNotDefinedException("Schema for " + store.getName() + " is not defined.");
+                }
+                                
+                for(int i=0; i<s.size(); i++) {
+                    output.add(s.getField(i).uid);
+                }                                                
+            }        
+            
+            // for store, input uids are same as output uids
+            store.annotate(INPUTUIDS, output);
+        }
+        
+        public void visitLOJoin(LOJoin join) throws IOException {
+            Set<Long> output = setOutputUids(join);
+            
+            // the input uids contains all the output uids and
+            // projections in join expressions
+            Set<Long> input = new HashSet<Long>(output);
+            
+            Collection<LogicalExpressionPlan> exps = join.getExpressionPlans();
+            Iterator<LogicalExpressionPlan> iter = exps.iterator();
+            while(iter.hasNext()) {
+                LogicalExpressionPlan exp = iter.next();
+                collectUids(join, exp, input);
+            }
+            
+            join.annotate(INPUTUIDS, input);
+        }
+        
+        @Override
+        public void visitLOCogroup(LOCogroup cg) throws IOException {
+            Set<Long> output = setOutputUids(cg);
+            
+            // the input uids contains all the output uids and
+            // projections in join expressions
+            Set<Long> input = new HashSet<Long>(output);
+            
+            // Add all the uids required for doing cogroup. As in all the
+            // keys on which the cogroup is done.
+            for( LogicalExpressionPlan plan : cg.getExpressionPlans().values() ) {
+                collectUids(cg, plan, input);
+            }
+            
+            // Now check for the case where the output uid is a generated one
+            // If that is the case we need to add the uids which generated it in 
+            // the input
+            Map<Integer,Long> generatedInputUids = cg.getGeneratedInputUids();
+            for( Map.Entry<Integer, Long> entry : generatedInputUids.entrySet() ) {
+                Long uid = entry.getValue();
+                if( output.contains(uid) ) {
+                    // Hence we need to all the full schema of the bag
+                    LogicalRelationalOperator pred =
+                        (LogicalRelationalOperator) cg.getPlan().getPredecessors(cg).get(entry.getKey());
+                    input.addAll( getAllUids( pred.getSchema() ) );
+                }
+            }
+            
+            cg.annotate(INPUTUIDS, input);
+        }
+        
+        /*
+         * This function returns all uids present in the given schema
+         */
+        private Set<Long> getAllUids( LogicalSchema schema ) {            
+            Set<Long> uids = new HashSet<Long>();
+            
+            if( schema == null ) {
+                return uids;
+            }
+            
+            for( LogicalFieldSchema field : schema.getFields() ) {
+                if( ( field.type == DataType.TUPLE || field.type == DataType.BAG )
+                        && field.schema != null ) {
+                   uids.addAll( getAllUids( field.schema ) );
+                }
+                uids.add( field.uid );
+            }
+            return uids;
+        }
+        
+        @SuppressWarnings("unchecked")
+        public void visitLOForEach(LOForEach foreach) throws IOException {
+            Set<Long> output = setOutputUids(foreach);
+            
+            LogicalPlan innerPlan = foreach.getInnerPlan();
+            LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
+            gen.annotate(OUTPUTUIDS, output);
+            
+            ColumnDependencyVisitor v = new ColumnDependencyVisitor(innerPlan);            
+            v.visit();
+            
+            Set<Long> input = new HashSet<Long>();
+            List<Operator> sources = innerPlan.getSources();
+            for(Operator s: sources) {
+                Set<Long> in = (Set<Long>)s.getAnnotation(INPUTUIDS);
+                if (in != null) {
+                    input.addAll(in);
+                }
+            }
+            
+            foreach.annotate(INPUTUIDS, input);
+        }         
+
+        @SuppressWarnings("unchecked")
+        public void visitLOGenerate(LOGenerate gen) throws IOException {
+             Set<Long> output = (Set<Long>)gen.getAnnotation(OUTPUTUIDS);
+            
+             Set<Long> input = new HashSet<Long>();
+             
+             List<LogicalExpressionPlan> ll = gen.getOutputPlans();
+             
+             Iterator<Long> iter = output.iterator();
+             while(iter.hasNext()) {
+                 long uid = iter.next();
+                 boolean found = false;
+                 for(int i=0; i<ll.size(); i++) {
+                     LogicalExpressionPlan exp = ll.get(i);
+                     LogicalExpression op = (LogicalExpression)exp.getSources().get(0);
+                     
+                     if (op.getUid() == uid) {
+                         collectUids(gen, exp, input);                         
+                         found = true;
+                        
+                     } else if (op instanceof ProjectExpression && ((ProjectExpression)op).isProjectStar()) {
+                         int inputNum = ((ProjectExpression)op).getInputNum();                            	
+                         LogicalRelationalOperator pred = (LogicalRelationalOperator)gen.getPlan().getPredecessors(gen).get(inputNum);
+                         
+                         if (pred.getSchema() == null) {
+                             throw new SchemaNotDefinedException("Schema for " + pred.getName() + " is not defined.");
+                         }
+                         for(LogicalFieldSchema f: pred.getSchema().getFields()) {
+                             if (f.uid == uid) {
+                                 input.add(uid);
+                                 found = true;
+                             }
+                         }
+                         
+                     } else if (gen.getFlattenFlags()[i]) {
+                         // if uid equal to the expression, get all uids of original projections
+                         List<Operator> ss = exp.getSinks();
+                         for(Operator s: ss) {
+                             if (s instanceof ProjectExpression) {
+                                 int inputNum = ((ProjectExpression)s).getInputNum();                            	
+                                 LogicalRelationalOperator pred = (LogicalRelationalOperator)gen.getPlan().getPredecessors(gen).get(inputNum);
+                                 
+                                 if (pred.getSchema() == null) {
+                                     throw new SchemaNotDefinedException("Schema for " + pred.getName() + " is not defined.");
+                                 }
+                                 if (pred.getSchema().findField(uid) != -1) {                                    
+                                     input.add(uid);                                    
+                                     found = true;
+                                 }
+                             }
+                         }
+                     }
+                     
+                     if (found) {
+                         break;
+                     }
+                 }
+                 
+                 if (!found) {                    
+                     throw new IOException("uid " + uid +" is not in the schema of LOForEach");                    
+                 }
+             }
+              
+             // for the flatten bag, we need to make sure at least one field is in the input
+             for(int i=0; i<ll.size(); i++) {
+                 if (!gen.getFlattenFlags()[i]) {
+                     continue;
+                 }
+                 
+                 LogicalExpressionPlan exp = ll.get(i);
+                 Operator s = exp.getSinks().get(0);
+                 
+                 if (s instanceof ProjectExpression) {
+                     int inputNum = ((ProjectExpression)s).getInputNum();       
+                     int colNum = ((ProjectExpression)s).getColNum();       
+                     LogicalRelationalOperator pred = (LogicalRelationalOperator)gen.getPlan().getPredecessors(gen).get(inputNum);
+                     
+                     LogicalSchema predSchema = pred.getSchema();
+                     if (predSchema == null) {
+                         throw new SchemaNotDefinedException("Schema for " + pred.getName() + " is not defined.");
+                     }
+                     
+                     if (predSchema.getField(colNum).type == DataType.BAG) {
+                         long fuid = predSchema.getField(colNum).uid;
+                         LogicalSchema fschema = predSchema.getField(colNum).schema;
+                         if (input.contains(fuid)) {
+                             continue;
+                         }
+                         
+                         if (fschema == null) {
+                             input.add(fuid);
+                         }
+
+                         boolean found = false;
+                         for(long uid: input) {
+                             if (fschema.findField(uid) != -1) {
+                                 found = true;
+                                 break;
+                             }
+                         }
+                         
+                         // if the input uids doesn't contain any field from this bag, then add the first field
+                         if (!found) {
+                             input.add(fschema.getField(0).uid);
+                         }
+                     }
+                 }                                 	 
+             }
+             
+             gen.annotate(INPUTUIDS, input);
+        }
+        
+        public void visitLOInnerLoad(LOInnerLoad load) throws IOException {
+            Set<Long> output = setOutputUids(load);
+            load.annotate(INPUTUIDS, output);
+        }
+        
+        private void collectUids(LogicalRelationalOperator currentOp, LogicalExpressionPlan exp, Set<Long> uids) throws IOException {
+            List<Operator> ll = exp.getSinks();
+            for(Operator op: ll) {
+                if (op instanceof ProjectExpression) {
+                    if (!((ProjectExpression)op).isProjectStar()) {
+                        long uid = ((ProjectExpression)op).getUid();
+                        uids.add(uid);
+                    } else {
+                        LogicalRelationalOperator ref = ((ProjectExpression)op).findReferent(currentOp);
+                        LogicalSchema s = ref.getSchema();
+                        if (s == null) {
+                            throw new SchemaNotDefinedException("Schema not defined for " + ref.getAlias());
+                        }
+                        for(LogicalFieldSchema f: s.getFields()) {
+                            uids.add(f.uid);
+                        }
+                    }
+                }
+            }
+        }
+        
+        
+        @SuppressWarnings("unchecked")
+        private Set<Long> setOutputUids(LogicalRelationalOperator op) throws IOException {
+            
+            List<Operator> ll = plan.getSuccessors(op);
+            Set<Long> uids = new HashSet<Long>();
+            
+            LogicalSchema s = op.getSchema();
+            if (s == null) {
+                throw new SchemaNotDefinedException("Schema for " + op.getName() + " is not defined.");
+            }
+                            
+            if (ll != null) {
+                // if this is not sink, the output uids are union of input uids of its successors
+                for(Operator succ: ll) {
+                    Set<Long> inputUids = (Set<Long>)succ.getAnnotation(INPUTUIDS);
+                    if (inputUids != null) {
+                        Iterator<Long> iter = inputUids.iterator();
+                        while(iter.hasNext()) {
+                            long uid = iter.next();
+                            
+                            if (s.findField(uid) != -1) {
+                                uids.add(uid);
+                            }
+                        }
+                    }
+                }
+            } else {
+                // if  it's leaf, set to its schema                
+                for(int i=0; i<s.size(); i++) {
+                    uids.add(s.getField(i).uid);
+                }                                
+            } 
+            
+            op.annotate(OUTPUTUIDS, uids);
+            return uids;
+        }
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java Mon Mar 15 03:28:27 2010
@@ -67,10 +67,10 @@ public class FilterAboveForeach extends 
 
     @Override
     public Transformer getNewTransformer() {
-        return new FilterAboveFlattenTransformer();
+        return new FilterAboveForEachTransformer();
     }
     
-    public class FilterAboveFlattenTransformer extends Transformer {
+    public class FilterAboveForEachTransformer extends Transformer {
 
         LOFilter filter = null;
         LOForEach foreach = null;
@@ -111,11 +111,6 @@ public class FilterAboveForeach extends 
                 for(int j=0; j< preds.size(); j++) {
                     LogicalRelationalOperator logRelOp = (LogicalRelationalOperator)preds.get(j);
                     if (hasAll( logRelOp, uids) ) {
-                        // If any of the uids are of complex type then we 
-                        // cannot think about moving this filter.
-                        if( containsComplexType(logRelOp.getSchema(), uids ) ) {
-                            break;
-                        }
                         forEachPred = (LogicalRelationalOperator) preds.get(j);
                         return true;
                     }
@@ -161,39 +156,32 @@ public class FilterAboveForeach extends 
          * @param uids Uids to check for
          * @return true if given LogicalRelationalOperator has all the given uids
          */
-        private boolean hasAll(LogicalRelationalOperator op, Set<Long> uids) {
-            LogicalSchema schema = op.getSchema();
-            List<LogicalSchema.LogicalFieldSchema> fields = schema.getFields();
-            Set<Long> all = new HashSet<Long>();
-            for(LogicalSchema.LogicalFieldSchema f:fields) {
-                all.add(f.uid);
-            }
+        private boolean hasAll(LogicalRelationalOperator op, Set<Long> uids) {                        
+            Set<Long> all = getAllProjectableUids(op.getSchema());            
             return all.containsAll(uids);
         }
         
-        /**
-         * This function checks if any of the fields mentioned are a Bug or Tuple.
-         * If so we cannot move the filter above the operator having the schema
-         * @param schema Schema of the operator we are investigating
-         * @param uids Uids of the fields we are checking for
-         * @return true if one of the uid belong to a complex type
+        /*
+         * Projectable set of uids are uids of fields of type except a bag
          */
-        private boolean containsComplexType(LogicalSchema schema, Set<Long> uids) {
-            List<LogicalSchema.LogicalFieldSchema> fields = schema.getFields();
-
-            for(LogicalSchema.LogicalFieldSchema f:fields) {
-                if ( ( f.type == DataType.BAG || f.type == DataType.TUPLE ) ) {
-                    if( uids.contains( f.uid ) ) {
-                        return true;
-                    }
-                    if( f.schema != null && containsComplexType(f.schema, uids) ) {
-                        return true;
-                    }
+        private Set<Long> getAllProjectableUids( LogicalSchema schema ) {
+            Set<Long> uids = new HashSet<Long>();
+            
+            if( schema == null ) {
+                return uids;
+            }
+            
+            for( LogicalSchema.LogicalFieldSchema field : schema.getFields() ) {
+                if( field.type == DataType.TUPLE ) {
+                    uids.addAll( getAllProjectableUids(field.schema ) );
+                }
+                if( field.type != DataType.BAG ) {
+                    uids.add( field.uid );
                 }
             }
-            return false;
+            return uids;
         }
-
+        
         @Override
         public OperatorPlan reportChanges() {            
             return subPlan;

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/MapKeysPruneHelper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/MapKeysPruneHelper.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/MapKeysPruneHelper.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/MapKeysPruneHelper.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.experimental.logical.expression.MapLookupExpression;
+import org.apache.pig.experimental.logical.optimizer.AllExpressionVisitor;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOGenerate;
+import org.apache.pig.experimental.logical.relational.LOJoin;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.experimental.plan.DependencyOrderWalker;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.ReverseDependencyOrderWalker;
+
+/**
+ * This filter Marks every Load Operator which has a Map 
+ * with MAP_MARKER_ANNOTATION. The annotation value is 
+ * <code>Map<Integer,Set<String>><code> where Integer is the column number 
+ * of the field and Set is the set of Keys in this field ( field is a map field only ).
+ * 
+ * It does this for only the top level schema in load. 
+ * 
+ * Algorithm:
+ *  Traverse the Plan in ReverseDependency order ( ie. Sink to Source )
+ *      For LogicalRelationalOperators having MapLookupExpression in their 
+ *          expressionPlan collect uid and keys related to it. This is
+ *          retained in the visitor
+ *      For ForEach having nested LogicalPlan use the same visitor hence
+ *          there is no distinction required
+ *      At Sources find all the uids provided by this source and annotate this 
+ *      LogicalRelationalOperator ( load ) with <code>Map<Integer,Set<String>></code>
+ *      containing only the column numbers that this LogicalRelationalOperator generates
+ *      
+ * NOTE: This is a simple Map Pruner. If a map key is mentioned in the script
+ *      then this pruner assumes you need the key. This pruner is not as optimized
+ *      as column pruner ( which removes a column if it is mentioned but never used )
+ *
+ */
+public class MapKeysPruneHelper {
+
+    public static final String REQUIRED_MAPKEYS = "MapPruner:RequiredKeys";
+    
+    private OperatorPlan currentPlan;
+    private OperatorSubPlan subplan;
+    
+    public MapKeysPruneHelper(OperatorPlan currentPlan) {
+        this.currentPlan = currentPlan;
+        
+        if (currentPlan instanceof OperatorSubPlan) {
+            subplan = new OperatorSubPlan(((OperatorSubPlan)currentPlan).getBasePlan());
+        } else {
+            subplan = new OperatorSubPlan(currentPlan);
+        }
+    }
+  
+
+    @SuppressWarnings("unchecked")
+    public boolean check() throws IOException {       
+        
+        // First check if we have a load with a map in it or not
+        List<Operator> sources = currentPlan.getSources();
+        
+        boolean hasMap = false;
+        for( Operator source : sources ) {
+            LogicalSchema schema = ((LogicalRelationalOperator)source).getSchema();
+            // If any of the loads has a null schema we dont know the ramifications here
+            // so we skip this optimization
+            if( schema == null ) {
+                return false;
+            }
+            if( hasMap( schema ) ) {
+                hasMap = true;
+            }
+        }
+                    
+        // We dont have any map in the first level of schema
+        if( !hasMap ) {
+            return false;
+        }
+        
+        
+        // Now we check what keys are needed
+        MapMarker marker = new MapMarker(currentPlan);
+        marker.visit();
+        
+        // Get all Uids from Sinks
+        List<Operator> sinks = currentPlan.getSinks();
+        Set<Long> sinkMapUids = new HashSet<Long>();
+        for( Operator sink : sinks ) {
+            LogicalSchema schema = ((LogicalRelationalOperator)sink).getSchema();
+            sinkMapUids.addAll( getMapUids( schema ) );
+        }
+        
+        
+        // If we have found specific keys which are needed then we return true;
+        // Else if we dont have any specific keys we return false
+        boolean hasAnnotation = false;
+        for( Operator source : sources ) {
+            Map<Integer,Set<String>> annotationValue = 
+                (Map<Integer, Set<String>>) ((LogicalRelationalOperator)source).getAnnotation(REQUIRED_MAPKEYS);
+            
+            // Now for all full maps found in sinks we cannot prune them at source
+            if( ! sinkMapUids.isEmpty() && annotationValue != null && 
+                    !annotationValue.isEmpty() ) {
+                Integer[] annotationKeyArray = annotationValue.keySet().toArray( new Integer[0] );
+                LogicalSchema sourceSchema = ((LogicalRelationalOperator)source).getSchema();
+                for( Integer col : annotationKeyArray ) {                	
+                    if( sinkMapUids.contains(sourceSchema.getField(col).uid)) {
+                        annotationValue.remove( col );
+                    }
+                }
+            }
+            
+            if ( annotationValue != null && annotationValue.isEmpty()) {
+                ((LogicalRelationalOperator)source).removeAnnotation(REQUIRED_MAPKEYS);
+                annotationValue = null;
+            }
+            
+            // Can we still prune any keys
+            if( annotationValue != null ) {
+                hasAnnotation = true;
+                subplan.add(source);
+            }
+        }
+        
+        // If all the sinks dont have any schema, we cant to any optimization
+        return hasAnnotation;
+    }
+    
+    /**
+     * This function checks if the schema has a map.
+     * We dont check for a nested structure.
+     * @param schema Schema to be checked
+     * @return true if it has a map, else false
+     * @throws NullPointerException incase Schema is null
+     */
+    private boolean hasMap(LogicalSchema schema ) throws NullPointerException {
+        for( LogicalFieldSchema field : schema.getFields() ) {
+            if( field.type == DataType.MAP ) {
+                return true;
+            }
+        }
+        return false;
+    }
+    
+    /**
+     * This function returns a set of Uids corresponding to
+     * map datatype in the first level of this schema
+     * @param schema Schema having fields
+     * @return
+     */
+    private Set<Long> getMapUids(LogicalSchema schema ) {
+        Set<Long> uids = new HashSet<Long>();
+        if( schema != null ) {
+            for( LogicalFieldSchema field : schema.getFields() ) {
+                if( field.type == DataType.MAP ) {
+                    uids.add( field.uid );
+                }
+            }
+        }
+        return uids;
+    }
+
+    public OperatorPlan reportChanges() {
+        return subplan;
+    }
+
+      
+    /**
+     * This class collects all the information required to create
+     * the list of keys required for a map
+     */
+    static public class MapMarker extends AllExpressionVisitor {
+        
+        Map<Long,Set<String>> inputUids = null;
+
+        protected MapMarker(OperatorPlan plan) {
+            super(plan, new ReverseDependencyOrderWalker(plan));
+            inputUids = new HashMap<Long,Set<String>>();
+        }
+        
+        @Override
+        public void visitLOLoad(LOLoad load) throws IOException {
+            if( load.getSchema() != null ) {
+                Map<Integer,Set<String>> annotation = new HashMap<Integer,Set<String>>();
+                for( int i=0; i<load.getSchema().size(); i++) {
+                    LogicalFieldSchema field = load.getSchema().getField(i);
+                    if( inputUids.containsKey( field.uid ) ) {
+                        annotation.put(i, inputUids.get( field.uid ) );
+                    }
+                }
+                load.annotate(REQUIRED_MAPKEYS, annotation);
+            }
+        }
+
+        @Override
+        public void visitLOFilter(LOFilter filter) throws IOException {
+            currentOp = filter;
+            MapExprMarker v = (MapExprMarker) getVisitor(filter.getFilterPlan());
+            v.visit();
+            mergeUidKeys( v.inputUids );
+        }
+        
+        @Override
+        public void visitLOJoin(LOJoin join) throws IOException {
+            currentOp = join;
+            Collection<LogicalExpressionPlan> c = join.getExpressionPlans();
+            for (LogicalExpressionPlan plan : c) {
+                MapExprMarker v = (MapExprMarker) getVisitor(plan);
+                v.visit();
+                mergeUidKeys( v.inputUids );
+            }
+        }
+        
+        @Override
+        public void visitLOGenerate(LOGenerate gen) throws IOException {
+            currentOp = gen;
+            Collection<LogicalExpressionPlan> plans = gen.getOutputPlans();
+            for( LogicalExpressionPlan plan : plans ) {
+                MapExprMarker v = (MapExprMarker) getVisitor(plan);
+                v.visit();
+                mergeUidKeys( v.inputUids );
+            }
+        }
+        
+        private void mergeUidKeys( Map<Long, Set<String> > inputMap ) {
+            for( Map.Entry<Long, Set<String>> entry : inputMap.entrySet() ) {
+                if( inputUids.containsKey(entry.getKey()) ) {
+                    Set<String> mapKeySet = inputUids.get(entry.getKey());
+                    mapKeySet.addAll(entry.getValue());
+                } else {
+                    inputUids.put(entry.getKey(), inputMap.get(entry.getKey()));
+                }
+            }
+        }
+
+        @Override
+        protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr) {
+            return new MapExprMarker(expr );
+        }
+        
+        static class MapExprMarker extends LogicalExpressionVisitor {
+
+            Map<Long,Set<String>> inputUids = null;
+            
+            protected MapExprMarker(OperatorPlan p) {
+                super(p, new DependencyOrderWalker(p));
+                inputUids = new HashMap<Long,Set<String>>();
+            }
+
+            public void visitMapLookup(MapLookupExpression op) throws IOException {
+                Long uid = op.getMap().getUid();
+                String key = op.getLookupKey();
+                
+                HashSet<String> mapKeySet = null;
+                if( inputUids.containsKey(uid) ) {
+                    mapKeySet = (HashSet<String>) inputUids.get(uid);                                        
+                } else {
+                    mapKeySet = new HashSet<String>();
+                    inputUids.put(uid, mapKeySet);
+                }
+                mapKeySet.add(key);
+            }
+        }
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java Mon Mar 15 03:28:27 2010
@@ -172,13 +172,13 @@ public class PushUpFilter extends Rule {
         // check if a relational operator contains all of the specified uids
         private boolean hasAll(LogicalRelationalOperator op, Set<Long> uids) {
             LogicalSchema schema = op.getSchema();
-            List<LogicalSchema.LogicalFieldSchema> fields = schema.getFields();
-            Set<Long> all = new HashSet<Long>();
-            for(LogicalSchema.LogicalFieldSchema f:fields) {
-                all.add(f.uid);
+            for(long uid: uids) {
+                if (schema.findField(uid) == -1) {
+                    return false;
+                }
             }
             
-            return all.containsAll(uids);
+            return true;
         }
            
         @Override

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/WholePlanRule.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/WholePlanRule.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/WholePlanRule.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/WholePlanRule.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+
+/**
+ * Super class for all rules that operates on the whole plan. It doesn't look for
+ * a specific pattern. An example of such kind rule is ColumnPrune.
+ *
+ */
+public abstract class WholePlanRule extends Rule {
+
+    public WholePlanRule(String n) {
+        super(n);
+    }
+
+    public List<OperatorPlan> match(OperatorPlan plan) {
+        currentPlan = plan;
+        List<OperatorPlan> ll = new ArrayList<OperatorPlan>();
+        ll.add(plan);
+        return ll;
+    }
+    
+    @Override
+    protected OperatorPlan buildPattern() {
+        return null;
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java Mon Mar 15 03:28:27 2010
@@ -18,6 +18,7 @@
 
 package org.apache.pig.experimental.plan;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
@@ -271,5 +272,17 @@ public abstract class BaseOperatorPlan i
 
         PlanPrinter npp = new PlanPrinter(this, ps);
         npp.visit();
-}
+    }
+    
+    @Override
+    public String toString() {
+        ByteArrayOutputStream os = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(os);
+        try {
+            explain(ps,"",false);
+        } catch (IOException e) {
+            return "";
+        }
+        return os.toString();
+    }   
 }

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalColumnPrune.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalColumnPrune.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalColumnPrune.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalColumnPrune.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.util.*;
+
+import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.experimental.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.experimental.logical.optimizer.UidStamper;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.rules.AddForEach;
+import org.apache.pig.experimental.logical.rules.ColumnMapKeyPrune;
+import org.apache.pig.experimental.logical.rules.MapKeysPruneHelper;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.optimizer.PlanOptimizer;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.test.utils.LogicalPlanTester;
+
+import junit.framework.TestCase;
+
+public class TestExperimentalColumnPrune extends TestCase {
+
+    LogicalPlan plan = null;
+  
+    private LogicalPlan migratePlan(org.apache.pig.impl.logicalLayer.LogicalPlan lp) throws VisitorException{
+        LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);        
+        visitor.visit();
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
+        
+        try {
+            UidStamper stamper = new UidStamper(newPlan);
+            stamper.visit();
+            
+            return newPlan;
+        }catch(Exception e) {
+            throw new VisitorException(e);
+        }
+    }
+    
+   
+    public void testNoPrune() throws Exception  {
+        // no foreach
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
+        lpt.buildPlan("b = filter a by v1==NULL;");        
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store b into 'empty';");  
+        LogicalPlan newLogicalPlan = migratePlan(plan);
+               
+        PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
+        lpt.buildPlan("b = filter a by v1==NULL;");        
+        plan = lpt.buildPlan("store b into 'empty';");  
+        LogicalPlan expected = migratePlan(plan);
+        
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        // no schema
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt';");
+        lpt.buildPlan("b = foreach a generate $0, $1;");
+        plan = lpt.buildPlan("store b into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt';");
+        lpt.buildPlan("b = foreach a generate $0, $1;");
+        plan = lpt.buildPlan("store b into 'empty';");  
+        expected = migratePlan(plan);
+        assertTrue(expected.isEqual(newLogicalPlan));
+    }
+       
+    public void testPrune() throws Exception  {
+        // only foreach
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
+        lpt.buildPlan("b = foreach a generate id;");        
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store b into 'empty';");  
+        LogicalPlan newLogicalPlan = migratePlan(plan);
+               
+        PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id);");
+        lpt.buildPlan("b = foreach a generate id;");        
+        plan = lpt.buildPlan("store b into 'empty';");  
+        LogicalPlan expected = migratePlan(plan);
+        
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        // with filter
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v5, v3, v4, v2);");
+        lpt.buildPlan("b = filter a by v1 != NULL AND (v2+v3)<100;");
+        lpt.buildPlan("c = foreach b generate id;");
+        plan = lpt.buildPlan("store c into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v3, v2);");
+        lpt.buildPlan("b = filter a by v1 != NULL AND (v2+v3)<100;");
+        lpt.buildPlan("c = foreach b generate id;");
+        plan = lpt.buildPlan("store c into 'empty';"); 
+        expected = migratePlan(plan);
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        // with 2 foreach
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v5, v3, v4, v2);");
+        lpt.buildPlan("b = foreach a generate v2, v5, v4;");
+        lpt.buildPlan("c = foreach b generate v5, v4;");
+        plan = lpt.buildPlan("store c into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (v5, v4);");
+        lpt.buildPlan("b = foreach a generate v5, v4;");
+        lpt.buildPlan("c = foreach b generate v5, v4;");
+        plan = lpt.buildPlan("store c into 'empty';"); 
+        expected = migratePlan(plan);
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        // with 2 foreach
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v5, v3, v4, v2);");
+        lpt.buildPlan("b = foreach a generate id, v1, v5, v3, v4;");
+        lpt.buildPlan("c = foreach b generate v5, v4;");
+        plan = lpt.buildPlan("store c into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (v5, v4);");
+        lpt.buildPlan("b = foreach a generate v5, v4;");
+        lpt.buildPlan("c = foreach b generate v5, v4;");
+        plan = lpt.buildPlan("store c into 'empty';"); 
+        expected = migratePlan(plan);
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        // with 2 foreach and filter in between
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v5, v3, v4, v2);");
+        lpt.buildPlan("b = foreach a generate v2, v5, v4;");
+        lpt.buildPlan("c = filter b by v2 != NULL;");
+        lpt.buildPlan("d = foreach c generate v5, v4;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (v5, v4, v2);");
+        lpt.buildPlan("b = foreach a generate v2, v5, v4;");
+        lpt.buildPlan("c = filter b by v2 != NULL;");
+        lpt.buildPlan("d = foreach c generate v5, v4;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        expected = migratePlan(plan);
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        // with 2 foreach after join
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v2, v3);");
+        lpt.buildPlan("b = load 'c.txt' as (id, v4, v5, v6);");
+        lpt.buildPlan("c = join a by id, b by id;");       
+        lpt.buildPlan("d = foreach c generate a::id, v5, v3, v4;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, v3);");
+        lpt.buildPlan("b = load 'c.txt' as (id, v4, v5);");
+        lpt.buildPlan("c = join a by id, b by id;");       
+        lpt.buildPlan("d = foreach c generate a::id, v5, v3, v4;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        expected = migratePlan(plan);
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        // with BinStorage, insert foreach after load
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");        
+        lpt.buildPlan("c = filter a by v2 != NULL;");
+        lpt.buildPlan("d = foreach c generate v5, v4;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");
+        lpt.buildPlan("b = foreach a generate v5, v4, v2;");
+        lpt.buildPlan("c = filter b by v2 != NULL;");
+        lpt.buildPlan("d = foreach c generate v5, v4;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        expected = migratePlan(plan);
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+       // with BinStorage, not to insert foreach after load if there is already one
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");    
+        lpt.buildPlan("b = foreach a generate v5, v4, v2;");
+        lpt.buildPlan("c = filter b by v2 != NULL;");
+        lpt.buildPlan("d = foreach c generate v5;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");
+        lpt.buildPlan("b = foreach a generate v5, v2;");
+        lpt.buildPlan("c = filter b by v2 != NULL;");
+        lpt.buildPlan("d = foreach c generate v5;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        expected = migratePlan(plan);
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+       // with BinStorage, not to insert foreach after load if there is already one
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");    
+        lpt.buildPlan("b = foreach a generate v5, v4, v2, 10;");
+        lpt.buildPlan("c = filter b by v2 != NULL;");
+        lpt.buildPlan("d = foreach c generate v5;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");
+        lpt.buildPlan("b = foreach a generate v5, v2, 10;");
+        lpt.buildPlan("c = filter b by v2 != NULL;");
+        lpt.buildPlan("d = foreach c generate v5;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        expected = migratePlan(plan);
+        assertTrue(expected.isEqual(newLogicalPlan));
+    }
+    
+    public void testPruneWithMapKey() throws Exception {
+         // only foreach
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, m:map[]);");
+        lpt.buildPlan("b = foreach a generate id, m#'path';");        
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store b into 'empty';");  
+        LogicalPlan newLogicalPlan = migratePlan(plan);
+               
+        PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, m:map[]);");
+        lpt.buildPlan("b = foreach a generate id, m#'path';");        
+        plan = lpt.buildPlan("store b into 'empty';");  
+        LogicalPlan expected = migratePlan(plan);
+        
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        LOLoad op = (LOLoad)newLogicalPlan.getSources().get(0);
+        Map<Integer,Set<String>> annotation = 
+                (Map<Integer, Set<String>>) op.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+        assertEquals(annotation.size(), 1);
+        Set<String> s = new HashSet<String>();
+        s.add("path");
+        assertEquals(annotation.get(2), s);
+        
+        // foreach with join
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, m:map[]);");
+        lpt.buildPlan("b = load 'd.txt' as (id, v1, m:map[]);");
+        lpt.buildPlan("c = join a by id, b by id;");
+        lpt.buildPlan("d = filter c by a::m#'path' != NULL;");
+        lpt.buildPlan("e = foreach d generate a::id, b::id, b::m#'path', a::m;");        
+        plan = lpt.buildPlan("store e into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, m:map[]);");
+        lpt.buildPlan("b = load 'd.txt' as (id, m:map[]);");
+        lpt.buildPlan("c = join a by id, b by id;");
+        lpt.buildPlan("d = filter c by a::m#'path' != NULL;");
+        lpt.buildPlan("e = foreach d generate a::id, b::id, b::m#'path', a::m;");        
+        plan = lpt.buildPlan("store e into 'empty';");  
+        expected = migratePlan(plan);
+        
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        List<Operator> ll = newLogicalPlan.getSources();
+        assertEquals(ll.size(), 2);
+        LOLoad loada = null;
+        LOLoad loadb = null;
+        for(Operator opp: ll) {
+            if (((LogicalRelationalOperator)opp).getAlias().equals("a")) {
+                loada = (LOLoad)opp;
+                continue;
+            }
+            
+            if (((LogicalRelationalOperator)opp).getAlias().equals("b")) {
+                loadb = (LOLoad)opp;
+                continue;
+            }
+        }
+                
+        annotation = 
+                (Map<Integer, Set<String>>) loada.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+        assertNull(annotation);
+        
+        annotation = 
+            (Map<Integer, Set<String>>) loadb.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+        assertEquals(annotation.size(), 1);
+    
+        s = new HashSet<String>();
+        s.add("path");
+        assertEquals(annotation.get(2), s);
+    }
+    
+    public void testPruneWithBag() throws Exception  {
+        // filter above foreach
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, v:bag{t:(s1,s2,s3)});");
+        lpt.buildPlan("b = filter a by id>10;");
+        lpt.buildPlan("c = foreach b generate id, FLATTEN(v);");    
+        lpt.buildPlan("d = foreach c generate id, v::s2;");    
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store d into 'empty';");  
+        LogicalPlan newLogicalPlan = migratePlan(plan);
+               
+        PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, v:bag{t:(s1,s2,s3)});");
+        lpt.buildPlan("b = filter a by id>10;");
+        lpt.buildPlan("c = foreach b generate id, FLATTEN(v);");    
+        lpt.buildPlan("d = foreach c generate id, v::s2;");    
+        plan = lpt.buildPlan("store d into 'empty';");
+        LogicalPlan expected = migratePlan(plan);
+        
+        assertTrue(expected.isEqual(newLogicalPlan));
+    }
+    
+    public void testAddForeach() throws Exception  {
+        // filter above foreach
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
+        lpt.buildPlan("b = filter a by v1>10;");
+        lpt.buildPlan("c = foreach b generate id;");        
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store c into 'empty';");  
+        LogicalPlan newLogicalPlan = migratePlan(plan);
+               
+        PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, v1);");
+        lpt.buildPlan("b = filter a by v1>10;");
+        lpt.buildPlan("c = foreach b generate id;");      
+        plan = lpt.buildPlan("store c into 'empty';");  
+        LogicalPlan expected = migratePlan(plan);
+        
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        // join with foreach
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
+        lpt.buildPlan("b = load 'd.txt' as (id, v1, v2);");
+        lpt.buildPlan("c = join a by id, b by id;");
+        lpt.buildPlan("d = filter c by a::v1>b::v1;");
+        lpt.buildPlan("e = foreach d generate a::id;");        
+        plan = lpt.buildPlan("store e into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, v1);");
+        lpt.buildPlan("b = load 'd.txt' as (id, v1);");
+        lpt.buildPlan("c = join a by id, b by id;");
+        lpt.buildPlan("d = foreach c generate a::id, a::v1, b::v1;");        
+        lpt.buildPlan("e = filter d by a::v1>b::v1;");
+        lpt.buildPlan("f = foreach e generate a::id;");        
+        plan = lpt.buildPlan("store f into 'empty';");  
+        expected = migratePlan(plan);
+        
+        assertTrue(expected.isEqual(newLogicalPlan));
+    }
+    
+    public class MyPlanOptimizer extends LogicalPlanOptimizer {
+
+        protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
+            super(p, iterations);			
+        }
+        
+        protected List<Set<Rule>> buildRuleSets() {            
+            List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+            
+            Rule r = new ColumnMapKeyPrune("ColumnMapKeyPrune");
+            Set<Rule> s = new HashSet<Rule>();
+            s.add(r);            
+            ls.add(s);
+            
+            r = new AddForEach("AddForEach");
+            s = new HashSet<Rule>();
+            s.add(r);            
+            ls.add(s);
+            
+            return ls;
+        }
+    }    
+}