You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/04 19:46:48 UTC

svn commit: r982345 [7/13] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/ src/org/apache/pig/newplan/ src/org/apache/pig/newplan/logical/ src/org/apache/pig/newplan/log...

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/AddForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/AddForEach.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/AddForEach.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/AddForEach.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.logical.Util;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+public class AddForEach extends WholePlanRule {
+    protected static final String STATUS = "AddForEach:Status";
+    protected static final int STATUSADDED = 1;
+    protected static final int STATUSNEWFOREACH = 2;
+    
+    public AddForEach(String n) {
+        super(n);		
+    }
+
+    @Override
+    public Transformer getNewTransformer() {
+        return new AddForEachTransformer();
+    }
+    
+    public class AddForEachTransformer extends Transformer {
+        LogicalRelationalOperator opForAdd;
+        OperatorSubPlan subPlan;
+
+        @Override
+        public boolean check(OperatorPlan matched) throws IOException {
+            Iterator<Operator> iter = matched.getOperators();
+            while(iter.hasNext()) {
+                LogicalRelationalOperator op = (LogicalRelationalOperator)iter.next();
+                if ((op instanceof LOFilter||op instanceof LOSort||op instanceof LOSplitOutput) && shouldAdd(op)) {
+                    opForAdd = op;
+                    return true;
+                }
+            }
+            
+            return false;
+        }
+
+        @Override
+        public OperatorPlan reportChanges() {        	
+            return subPlan;
+        }
+
+        private void addSuccessors(Operator op) throws IOException {
+            subPlan.add(op);
+            List<Operator> ll = op.getPlan().getSuccessors(op);
+            if (ll != null) {
+                for(Operator suc: ll) {
+                    addSuccessors(suc);
+                }
+            }
+        }
+        
+        @Override
+        public void transform(OperatorPlan matched) throws IOException {            
+            addForeach(opForAdd);
+            
+            subPlan = new OperatorSubPlan(currentPlan);
+            addSuccessors(opForAdd);
+        }
+        
+        @SuppressWarnings("unchecked")
+        // check if an LOForEach should be added after the logical operator
+        private boolean shouldAdd(LogicalRelationalOperator op) throws IOException {
+            Integer status = (Integer)op.getAnnotation(STATUS);
+            if (status!=null && (status==STATUSADDED ||status==STATUSNEWFOREACH))
+                return false;
+            
+            Set<Long> outputUids = (Set<Long>)op.getAnnotation(ColumnPruneHelper.OUTPUTUIDS);
+            if (outputUids==null)
+                return false;
+            
+            LogicalSchema schema = op.getSchema();
+            if (schema==null)
+                return false;
+            
+            Set<Integer> columnsToDrop = new HashSet<Integer>();
+            
+            for (int i=0;i<schema.size();i++) {
+                if (!outputUids.contains(schema.getField(i).uid))
+                    columnsToDrop.add(i);
+            }
+            
+            if (!columnsToDrop.isEmpty()) return true;
+            
+            return false;
+        }
+        
+        @SuppressWarnings("unchecked")
+        private void addForeach(LogicalRelationalOperator op) throws IOException {
+            Set<Long> outputUids = (Set<Long>)op.getAnnotation(ColumnPruneHelper.OUTPUTUIDS);
+            LogicalSchema schema = op.getSchema();
+            Set<Integer> columnsToDrop = new HashSet<Integer>();
+            
+            for (int i=0;i<schema.size();i++) {
+                if (!outputUids.contains(schema.getField(i).uid))
+                    columnsToDrop.add(i);
+            }
+            
+            if (!columnsToDrop.isEmpty()) {
+                LOForEach foreach = Util.addForEachAfter((LogicalPlan)op.getPlan(), op, columnsToDrop);
+                op.annotate(STATUS, STATUSADDED);
+                foreach.annotate(STATUS, STATUSNEWFOREACH);
+            }
+        }
+    }          
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnMapKeyPrune.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnMapKeyPrune.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnMapKeyPrune.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnMapKeyPrune.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+/**
+ * This Rule prunes columns and map keys and set to loader. This rule depends
+ * on MapKeysPruneHelper to calculate what keys are required for a loader,
+ * and ColumnPruneHelper to calculate the required columns for a loader. Then
+ * it combines the map keys and columns info to set into the loader.
+ */
+public class ColumnMapKeyPrune extends WholePlanRule {
+    private boolean hasRun;
+    
+    public ColumnMapKeyPrune(String n) {
+        super(n);
+        hasRun = false;
+    }
+
+    @Override
+    public Transformer getNewTransformer() {
+        return new ColumnMapKeyPruneTransformer();
+    }
+    
+    public class ColumnMapKeyPruneTransformer extends Transformer {
+        private MapKeysPruneHelper mapKeyHelper;
+        private ColumnPruneHelper columnHelper;
+        private boolean columnPrune;
+        private boolean mapKeyPrune;
+
+        /*
+         * This is a map of of required columns and map keys for each LOLoad        
+         * RequiredMapKeys --> Map<Integer, Set<String> >
+         * RequiredColumns --> Set<Integer>
+         * 
+         * The integer are column indexes.
+         */
+        private Map<LOLoad,Pair<Map<Integer,Set<String>>,Set<Integer>>> requiredItems = 
+            new HashMap<LOLoad,Pair<Map<Integer,Set<String>>,Set<Integer>>>();
+        
+        @Override
+        public boolean check(OperatorPlan matched) throws IOException {
+            // only run this rule once
+            if (hasRun) {
+                return false;
+            }
+            
+            hasRun = true;
+            mapKeyHelper = new MapKeysPruneHelper(matched);
+            columnHelper = new ColumnPruneHelper(matched);
+            
+            // check if map keys can be pruned
+            mapKeyPrune = mapKeyHelper.check();
+            // check if columns can be pruned
+            columnPrune = columnHelper.check();
+            
+            return mapKeyPrune || columnPrune;
+        }
+
+        @Override
+        public OperatorPlan reportChanges() {
+            return currentPlan;
+        }
+        
+        @SuppressWarnings("unchecked")
+        private void merge() {            
+            // combine annotations
+            for( Operator source : currentPlan.getSources() ) {
+                Map<Integer,Set<String>> mapKeys = 
+                    (Map<Integer, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+                Set<Integer> requiredColumns = null;
+                if (source.getAnnotation(ColumnPruneHelper.REQUIREDCOLS) != null) {
+                    requiredColumns = new HashSet<Integer>((Set<Integer>) source.getAnnotation(ColumnPruneHelper.REQUIREDCOLS));
+                }
+                
+                // We dont have any information so skip
+                if( requiredColumns == null && mapKeys == null ) {
+                    continue;
+                }
+                                
+                if( requiredColumns != null && mapKeys != null ) { 
+
+                    Set<Integer> duplicatedCols = new HashSet<Integer>();
+
+                    // Remove the columns already marked by MapKeys
+                    for( Integer col : requiredColumns ) {
+                        if( mapKeys.containsKey(col) ) {
+                            duplicatedCols.add(col);
+                        }
+                    }
+                    requiredColumns.removeAll(duplicatedCols);
+                } else if ( mapKeys != null && requiredColumns == null ) {
+                    // This is the case where only mapKeys can be pruned. And none
+                    // of the columns can be pruned. So we add all columns to the
+                    // requiredcolumns part
+                    requiredColumns = new HashSet<Integer>();
+                    for(int i = 0; i < ((LogicalRelationalOperator)source).getSchema().size(); i++ ) {
+                        if( !mapKeys.containsKey(i) ) {
+                            requiredColumns.add(i);
+                        }
+                    }
+                }
+
+                requiredItems.put((LOLoad) source, new Pair<Map<Integer,Set<String>>,Set<Integer>>(mapKeys, requiredColumns));
+            }         
+        }
+
+        @Override
+        public void transform(OperatorPlan matched) throws IOException {        	            
+            merge();
+            
+            ColumnPruneVisitor columnPruneVisitor = new ColumnPruneVisitor(currentPlan, requiredItems, columnPrune);
+            columnPruneVisitor.visit();
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,573 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.ReverseDependencyOrderWalker;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LODistinct;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOStream;
+import org.apache.pig.newplan.logical.relational.LOUnion;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.SchemaNotDefinedException;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+/**
+ * Helper class used by ColumnMapKeyPrune to figure out what columns can be pruned.
+ * It doesn't make any changes to the operator plan
+ *
+ */
+public class ColumnPruneHelper {
+    protected static final String INPUTUIDS = "ColumnPrune:InputUids";
+    public static final String OUTPUTUIDS = "ColumnPrune:OutputUids";    
+    protected static final String REQUIREDCOLS = "ColumnPrune:RequiredColumns";
+    
+    private OperatorPlan currentPlan;
+    private OperatorSubPlan subPlan;
+
+    public ColumnPruneHelper(OperatorPlan currentPlan) {
+        this.currentPlan = currentPlan;
+    }    
+    
+    private OperatorSubPlan getSubPlan() throws IOException {
+        OperatorSubPlan p = null;
+        if (currentPlan instanceof OperatorSubPlan) {
+            p = new OperatorSubPlan(((OperatorSubPlan)currentPlan).getBasePlan());
+        } else {
+            p = new OperatorSubPlan(currentPlan);
+        }
+        Iterator<Operator> iter = currentPlan.getOperators();
+        
+        while(iter.hasNext()) {
+            Operator op = iter.next();
+            if (op instanceof LOForEach) {
+                addOperator(op, p);
+            }
+        }
+        
+        return p;
+    }
+    
+    private void addOperator(Operator op, OperatorSubPlan subplan) throws IOException {
+        if (op == null) {
+            return;
+        }
+        
+        subplan.add(op);
+        
+        List<Operator> ll = currentPlan.getPredecessors(op);
+        if (ll == null) {
+            return;
+        }
+        
+        for(Operator pred: ll) {
+            addOperator(pred, subplan);
+        }
+    }
+    
+        
+    @SuppressWarnings("unchecked")
+    public boolean check() throws IOException {
+        List<Operator> sources = currentPlan.getSources();
+        // if this rule has run before, just return false
+        if (sources.get(0).getAnnotation(INPUTUIDS) != null) {
+            return false;
+        }
+        
+        // create sub-plan that ends with foreach
+        subPlan = getSubPlan();
+        if (subPlan.size() == 0) {
+            return false;
+        }
+        
+        ColumnDependencyVisitor v = new ColumnDependencyVisitor(subPlan);
+        try {
+            v.visit();
+        }catch(SchemaNotDefinedException e) {
+            // if any operator has an unknown schema, just return false
+            return false;
+        }
+        
+        List<Operator> ll = subPlan.getSources();
+        boolean found = false;
+        for(Operator op: ll) {
+            if (op instanceof LOLoad) {
+                Set<Long> uids = (Set<Long>)op.getAnnotation(INPUTUIDS);
+                LogicalSchema s = ((LOLoad) op).getSchema();
+                Set<Integer> required = getColumns(s, uids);
+                
+                if (required.size() < s.size()) {
+                    op.annotate(REQUIREDCOLS, required);              
+                    found = true;
+                }
+            }
+        }
+        
+        return found;
+    }
+
+    // get a set of column indexes from a set of uids
+    protected Set<Integer> getColumns(LogicalSchema schema, Set<Long> uids) throws IOException {
+        if (schema == null) {
+            throw new SchemaNotDefinedException("Schema is not defined.");
+        }
+        
+        Set<Integer> cols = new HashSet<Integer>();
+        Iterator<Long> iter = uids.iterator();
+        while(iter.hasNext()) {
+            long uid = iter.next();
+            int index = schema.findField(uid);
+            if (index == -1) {
+                throw new IOException("UID " + uid + " is not found in the schema");
+            }
+              
+            cols.add(index);
+        }
+          
+        return cols;
+    }
+    
+    public OperatorPlan reportChanges() {
+        return subPlan;
+    }
+   
+    // Visitor to calculate the input and output uids for each operator
+    // It doesn't change the plan, only put calculated info as annotations
+    // The input and output uids are not necessarily the top level uids of
+    // a schema. They may be the uids of lower level fields of complex fields
+    // that have their own schema.
+    static private class ColumnDependencyVisitor extends LogicalRelationalNodesVisitor {    	
+        
+        public ColumnDependencyVisitor(OperatorPlan plan) {
+            super(plan, new ReverseDependencyOrderWalker(plan));            
+        }
+        
+        @Override
+        public void visit(LOLoad load) throws IOException {
+            Set<Long> output = setOutputUids(load);
+            
+            // for load, input uids are same as output uids
+            load.annotate(INPUTUIDS, output);
+        }
+
+        @Override
+        public void visit(LOFilter filter) throws IOException {
+            Set<Long> output = setOutputUids(filter);
+            
+            // the input uids contains all the output uids and
+            // projections in filter conditions
+            Set<Long> input = new HashSet<Long>(output);
+            
+            LogicalExpressionPlan exp = filter.getFilterPlan();
+            collectUids(filter, exp, input);
+            
+            filter.annotate(INPUTUIDS, input);
+        }
+        
+        @Override
+        public void visit(LOStore store) throws IOException {
+            Set<Long> output = setOutputUids(store);            
+            
+            if (output.isEmpty()) {
+                // to deal with load-store-load-store case
+                LogicalSchema s = store.getSchema();
+                if (s == null) {
+                    throw new SchemaNotDefinedException("Schema for " + store.getName() + " is not defined.");
+                }
+                                
+                for(int i=0; i<s.size(); i++) {
+                    output.add(s.getField(i).uid);
+                }                                                
+            }        
+            
+            // for store, input uids are same as output uids
+            store.annotate(INPUTUIDS, output);
+        }
+        
+        @Override
+        public void visit(LOJoin join) throws IOException {
+            Set<Long> output = setOutputUids(join);
+            
+            // the input uids contains all the output uids and
+            // projections in join expressions
+            Set<Long> input = new HashSet<Long>(output);
+            
+            Collection<LogicalExpressionPlan> exps = join.getExpressionPlans();
+            Iterator<LogicalExpressionPlan> iter = exps.iterator();
+            while(iter.hasNext()) {
+                LogicalExpressionPlan exp = iter.next();
+                collectUids(join, exp, input);
+            }
+            
+            join.annotate(INPUTUIDS, input);
+        }
+        
+        @Override
+        public void visit(LOCogroup cg) throws IOException {
+            Set<Long> output = setOutputUids(cg);
+            
+            // the input uids contains all the output uids and
+            // projections in join expressions
+            Set<Long> input = new HashSet<Long>();
+            
+            // Add all the uids required for doing cogroup. As in all the
+            // keys on which the cogroup is done.
+            for( LogicalExpressionPlan plan : cg.getExpressionPlans().values() ) {
+                collectUids(cg, plan, input);
+            }
+            
+            // Now check for the case where the output uid is a generated one
+            // If that is the case we need to add the uids which generated it in 
+            // the input
+            Map<Integer,Long> generatedInputUids = cg.getGeneratedInputUids();
+            for( Map.Entry<Integer, Long> entry : generatedInputUids.entrySet() ) {
+                Long uid = entry.getValue();
+                if( output.contains(uid) ) {
+                    // Hence we need to all the full schema of the bag
+                    LogicalRelationalOperator pred =
+                        (LogicalRelationalOperator) cg.getPlan().getPredecessors(cg).get(entry.getKey());
+                    input.addAll( getAllUids( pred.getSchema() ) );
+                }
+            }
+            
+            cg.annotate(INPUTUIDS, input);
+        }
+        
+        @Override
+        public void visit(LOLimit limit) throws IOException {
+            Set<Long> output = setOutputUids(limit);
+            limit.annotate(INPUTUIDS, output);
+        }
+        
+        @Override
+        public void visit(LOStream stream) throws IOException {
+            Set<Long> input = new HashSet<Long>();
+            
+            // Every field is required
+            LogicalSchema s = stream.getSchema();
+            if (s == null) {
+                throw new SchemaNotDefinedException("Schema for " + stream.getName() + " is not defined.");
+            }
+            
+            for(int i=0; i<s.size(); i++) {
+                input.add(s.getField(i).uid);
+            }                                                
+            stream.annotate(INPUTUIDS, input);
+        }
+        
+        @Override
+        public void visit(LODistinct distinct) throws IOException {
+            Set<Long> input = new HashSet<Long>();
+            
+            // Every field is required
+            LogicalSchema s = distinct.getSchema();
+            if (s == null) {
+                throw new SchemaNotDefinedException("Schema for " + distinct.getName() + " is not defined.");
+            }
+            
+            for(int i=0; i<s.size(); i++) {
+                input.add(s.getField(i).uid);
+            }                                                
+            distinct.annotate(INPUTUIDS, input);
+        }
+        
+        @Override
+        public void visit(LOCross cross) throws IOException {
+            Set<Long> output = setOutputUids(cross);
+            // Since we do not change the topology of the plan, we keep
+            // at least one input for each predecessor.
+            List<Operator> preds = plan.getPredecessors(cross);
+            for (Operator pred : preds) {
+                LogicalSchema schema = ((LogicalRelationalOperator)pred).getSchema();
+                Set<Long> uids = getAllUids(schema);
+                boolean allPruned = true;
+                for (Long uid : uids) {
+                    if (output.contains(uid))
+                        allPruned = false;
+                }
+                if (allPruned)
+                    output.add(schema.getField(0).uid);
+            }
+            cross.annotate(INPUTUIDS, output);
+        }
+        
+        @Override
+        public void visit(LOUnion union) throws IOException {
+            Set<Long> output = setOutputUids(union);
+            Set<Long> input = new HashSet<Long>();
+            for (long uid : output) {
+                input.addAll(union.getInputUids(uid));
+            }
+            union.annotate(INPUTUIDS, input);
+        }
+        
+        @Override
+        public void visit(LOSplit split) throws IOException {
+            Set<Long> output = setOutputUids(split);
+            split.annotate(INPUTUIDS, output);
+        }
+        
+        @Override
+        public void visit(LOSplitOutput splitOutput) throws IOException {
+            Set<Long> output = setOutputUids(splitOutput);
+            
+            // the input uids contains all the output uids and
+            // projections in splitOutput conditions
+            Set<Long> input = new HashSet<Long>(output);
+            
+            LogicalExpressionPlan exp = splitOutput.getFilterPlan();
+            collectUids(splitOutput, exp, input);
+            
+            splitOutput.annotate(INPUTUIDS, input);
+        }
+        
+        @Override
+        public void visit(LOSort sort) throws IOException {
+            Set<Long> output = setOutputUids(sort);
+            
+            Set<Long> input = new HashSet<Long>(output);
+            
+            for (LogicalExpressionPlan exp : sort.getSortColPlans()) {
+                collectUids(sort, exp, input);
+            }
+            
+            sort.annotate(INPUTUIDS, input);
+        }
+        
+        /*
+         * This function returns all uids present in the given schema
+         */
+        private Set<Long> getAllUids( LogicalSchema schema ) {            
+            Set<Long> uids = new HashSet<Long>();
+            
+            if( schema == null ) {
+                return uids;
+            }
+            
+            for( LogicalFieldSchema field : schema.getFields() ) {
+                if( ( field.type == DataType.TUPLE || field.type == DataType.BAG )
+                        && field.schema != null ) {
+                   uids.addAll( getAllUids( field.schema ) );
+                }
+                uids.add( field.uid );
+            }
+            return uids;
+        }
+        
+        @Override
+        public void visit(LOForEach foreach) throws IOException {
+            Set<Long> output = setOutputUids(foreach);
+            
+            LogicalPlan innerPlan = foreach.getInnerPlan();
+            LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
+            gen.annotate(OUTPUTUIDS, output);
+            
+            visit(gen);
+            
+            foreach.annotate(INPUTUIDS, gen.getAnnotation(INPUTUIDS));
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public void visit(LOGenerate gen) throws IOException {
+             Set<Long> output = (Set<Long>)gen.getAnnotation(OUTPUTUIDS);
+             
+             Set<Long> input = new HashSet<Long>();
+             
+             List<LogicalExpressionPlan> ll = gen.getOutputPlans();
+             
+             Iterator<Long> iter = output.iterator();
+             while(iter.hasNext()) {
+                 long uid = iter.next();
+                 for(int i=0; i<ll.size(); i++) {
+                     boolean found = false;
+                     LogicalExpressionPlan exp = ll.get(i);
+                     LogicalExpression op = (LogicalExpression)exp.getSources().get(0);
+                     
+                     if (gen.getFlattenFlags()[i] && (op.getFieldSchema().type==DataType.TUPLE ||
+                             op.getFieldSchema().type== DataType.BAG)) {
+                         // if uid equal to the expression, get all uids of original projections
+                         LogicalSchema schema;
+
+                         schema = op.getFieldSchema().schema;
+                         for (LogicalSchema.LogicalFieldSchema fs : schema.getFields())
+                         {
+                             if (fs.uid==uid) {
+                                 found = true;
+                                 break;
+                             }
+                         }
+                     }
+                     else {
+                         // No flatten, collect outer uid
+                         if (op.getFieldSchema().uid == uid) {                         
+                             found = true;
+                         }
+                     }
+                     
+                     if (found) {
+                         List<Operator> srcs = exp.getSinks();
+                         for (Operator src : srcs) {
+                             if (src instanceof ProjectExpression) {
+                                 List<LOInnerLoad> innerLoads = LOForEach.findReacheableInnerLoadFromBoundaryProject((ProjectExpression)src);
+                                 for (LOInnerLoad innerLoad : innerLoads) {
+                                     ProjectExpression prj = innerLoad.getProjection();
+                                     if (prj.isProjectStar()) {
+                                         if (prj.findReferent().getSchema()!=null) {
+                                             for (LogicalSchema.LogicalFieldSchema fs : prj.findReferent().getSchema().getFields()) {
+                                                 input.add(fs.uid);
+                                             }
+                                         }
+                                     }
+                                     else {
+                                         if (prj.findReferent().getSchema()!=null) {
+                                             LogicalSchema.LogicalFieldSchema fs = prj.findReferent().getSchema().getField(prj.getColNum()); 
+                                             input.add(fs.uid);
+                                         }
+                                     }
+                                 }
+                             }
+                         }
+                     }
+                 }
+             }
+              
+             // for the flatten bag, we need to make sure at least one field is in the input
+             for(int i=0; i<ll.size(); i++) {
+                 if (!gen.getFlattenFlags()[i]) {
+                     continue;
+                 }
+                 LogicalExpressionPlan exp = ll.get(i);
+                 List<Operator> srcs = exp.getSinks();
+                 for (Operator src : srcs) {
+                     List<LOInnerLoad> innerLoads = LOForEach.findReacheableInnerLoadFromBoundaryProject((ProjectExpression)src);
+                     for (LOInnerLoad innerLoad : innerLoads) {
+                         ProjectExpression prj = innerLoad.getProjection();
+                         if (prj.isProjectStar()) {
+                             if (prj.findReferent().getSchema()!=null) {
+                                 for (LogicalSchema.LogicalFieldSchema fs : prj.findReferent().getSchema().getFields()) {
+                                     input.add(fs.uid);
+                                 }
+                             }
+                         }
+                         else {
+                             if (prj.findReferent().getSchema()!=null) {
+                                 LogicalSchema.LogicalFieldSchema fs = prj.findReferent().getSchema().getField(prj.getColNum());
+                                 input.add(fs.uid);
+                             }
+                         }
+                     }
+                 }
+             }
+             gen.annotate(INPUTUIDS, input);
+        }
+        
+        @Override
+        public void visit(LOInnerLoad load) throws IOException {
+            Set<Long> output = setOutputUids(load);
+            load.annotate(INPUTUIDS, output);
+        }
+        
+        private void collectUids(LogicalRelationalOperator currentOp, LogicalExpressionPlan exp, Set<Long> uids) throws IOException {
+            List<Operator> ll = exp.getSinks();
+            for(Operator op: ll) {
+                if (op instanceof ProjectExpression) {
+                    if (!((ProjectExpression)op).isProjectStar()) {
+                        long uid = ((ProjectExpression)op).getFieldSchema().uid;
+                        uids.add(uid);
+                    } else {
+                        LogicalRelationalOperator ref = ((ProjectExpression)op).findReferent();
+                        LogicalSchema s = ref.getSchema();
+                        if (s == null) {
+                            throw new SchemaNotDefinedException("Schema not defined for " + ref.getAlias());
+                        }
+                        for(LogicalFieldSchema f: s.getFields()) {
+                            uids.add(f.uid);
+                        }
+                    }
+                }
+            }
+        }
+        
+        @SuppressWarnings("unchecked")
+        private Set<Long> setOutputUids(LogicalRelationalOperator op) throws IOException {
+            
+            List<Operator> ll = plan.getSuccessors(op);
+            Set<Long> uids = new HashSet<Long>();
+            
+            LogicalSchema s = op.getSchema();
+            if (s == null) {
+                throw new SchemaNotDefinedException("Schema for " + op.getName() + " is not defined.");
+            }
+                            
+            if (ll != null) {
+                // if this is not sink, the output uids are union of input uids of its successors
+                for(Operator succ: ll) {
+                    Set<Long> inputUids = (Set<Long>)succ.getAnnotation(INPUTUIDS);
+                    if (inputUids != null) {
+                        Iterator<Long> iter = inputUids.iterator();
+                        while(iter.hasNext()) {
+                            long uid = iter.next();
+                            
+                            if (s.findField(uid) != -1) {
+                                uids.add(uid);
+                            }
+                        }
+                    }
+                }
+            } else {
+                // if  it's leaf, set to its schema                
+                for(int i=0; i<s.size(); i++) {
+                    uids.add(s.getField(i).uid);
+                }                                
+            } 
+            
+            op.annotate(OUTPUTUIDS, uids);
+            return uids;
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.LoadPushDown.RequiredField;
+import org.apache.pig.LoadPushDown.RequiredFieldList;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.ReverseDependencyOrderWalker;
+import org.apache.pig.newplan.logical.Util;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOUnion;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+
+public class ColumnPruneVisitor extends LogicalRelationalNodesVisitor {
+    protected static final Log log = LogFactory.getLog(ColumnPruneVisitor.class);
+    private Map<LOLoad,Pair<Map<Integer,Set<String>>,Set<Integer>>> requiredItems = 
+        new HashMap<LOLoad,Pair<Map<Integer,Set<String>>,Set<Integer>>>();
+    private boolean columnPrune;
+
+    public ColumnPruneVisitor(OperatorPlan plan, Map<LOLoad,Pair<Map<Integer,Set<String>>,Set<Integer>>> requiredItems,
+            boolean columnPrune) {
+        super(plan, new ReverseDependencyOrderWalker(plan));
+        this.columnPrune = columnPrune;
+        this.requiredItems = requiredItems;
+    }
+    
+    public void addRequiredItems(LOLoad load, Pair<Map<Integer,Set<String>>,Set<Integer>> requiredItem) {
+        requiredItems.put(load, requiredItem);
+    }
+    
+    @Override
+    public void visit(LOLoad load) throws IOException {
+        if(! requiredItems.containsKey( load ) ) {
+            return;
+        }
+        
+        Pair<Map<Integer,Set<String>>,Set<Integer>> required = 
+            requiredItems.get(load);
+        
+        RequiredFieldList requiredFields = new RequiredFieldList();
+
+        LogicalSchema s = load.getSchema();
+        for (int i=0;i<s.size();i++) {
+            RequiredField requiredField = null;
+            // As we have done processing ahead, we assume that 
+            // a column is not present in both ColumnPruner and 
+            // MapPruner
+            if( required.first != null && required.first.containsKey(i) ) {
+                requiredField = new RequiredField();
+                requiredField.setIndex(i);
+                requiredField.setType(s.getField(i).type);
+                List<RequiredField> subFields = new ArrayList<RequiredField>();
+                for( String key : required.first.get(i) ) {
+                    RequiredField subField = new RequiredField(key,-1,null,DataType.BYTEARRAY);
+                    subFields.add(subField);
+                }
+                requiredField.setSubFields(subFields);
+                requiredFields.add(requiredField);
+            }
+            if( required.second != null && required.second.contains(i) ) {
+                requiredField = new RequiredField();
+                requiredField.setIndex(i);
+                requiredField.setType(s.getField(i).type);      
+                requiredFields.add(requiredField);
+            }
+        }         
+            
+        log.info("Loader for " + load.getAlias() + " is pruned. Load fields " + requiredFields); 
+        for(RequiredField rf: requiredFields.getFields()) {
+            List<RequiredField> sub = rf.getSubFields();
+            if (sub != null) {
+                // log.info("For column " + rf.getIndex() + ", set map keys: " + sub.toString());
+                log.info("Map key required for " + load.getAlias() + ": $" + rf.getIndex() + "->" + sub);
+            }
+        }
+        
+        LoadPushDown.RequiredFieldResponse response = null;
+        try {
+            LoadFunc loadFunc = load.getLoadFunc();
+            if (loadFunc instanceof LoadPushDown) {
+                response = ((LoadPushDown)loadFunc).pushProjection(requiredFields);
+            }
+                                
+        } catch (FrontendException e) {
+            log.warn("pushProjection on "+load+" throw an exception, skip it");
+        }                      
+        
+        // Loader does not support column pruning, insert foreach      
+        if (columnPrune) {
+            if (response==null || !response.getRequiredFieldResponse()) {
+                LogicalPlan p = (LogicalPlan)load.getPlan();                        
+                Operator next = p.getSuccessors(load).get(0); 
+                // if there is already a LOForEach after load, we don't need to 
+                // add another LOForEach
+                if (next instanceof LOForEach) {
+                    return;
+                }
+                
+                LOForEach foreach = new LOForEach(load.getPlan());
+                
+                // add foreach to the base plan                       
+                p.add(foreach);
+                               
+                Pair<Integer,Integer> disconnectedPos = p.disconnect(load, next);
+                p.connect(load, disconnectedPos.first.intValue(), foreach, 0 );
+                p.connect(foreach, 0, next, disconnectedPos.second.intValue());
+                
+                LogicalPlan innerPlan = new LogicalPlan();
+                foreach.setInnerPlan(innerPlan);
+                
+                // build foreach inner plan
+                List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();              
+                LOGenerate gen = new LOGenerate(innerPlan, exps, new boolean[requiredFields.getFields().size()]);
+                innerPlan.add(gen);
+                
+                for (int i=0; i<requiredFields.getFields().size(); i++) {
+                    LoadPushDown.RequiredField rf = requiredFields.getFields().get(i);
+                    LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, rf.getIndex());                    
+                    innerPlan.add(innerLoad);          
+                    innerPlan.connect(innerLoad, gen);
+                    
+                    LogicalExpressionPlan exp = new LogicalExpressionPlan();
+                    ProjectExpression prj = new ProjectExpression(exp, i, -1, gen);
+                    exp.add(prj);
+                    exps.add(exp);
+                }                
+               
+            } else {
+                // columns are pruned, reset schema for LOLoader
+                List<Integer> requiredIndexes = new ArrayList<Integer>();
+                List<LoadPushDown.RequiredField> fieldList = requiredFields.getFields();
+                for (int i=0; i<fieldList.size(); i++) {                    
+                    requiredIndexes.add(fieldList.get(i).getIndex());
+                }
+
+                load.setRequiredFields(requiredIndexes);
+                
+                LogicalSchema newSchema = new LogicalSchema();
+                for (int i=0; i<fieldList.size(); i++) {                    
+                    newSchema.addField(s.getField(fieldList.get(i).getIndex()));
+                }
+                
+                load.setSchema(newSchema);
+            }
+        }
+    }
+
+    @Override
+    public void visit(LOFilter filter) throws IOException {
+    }
+    
+    @Override
+    public void visit(LOSplitOutput splitOutput) throws IOException {
+    }
+    
+    @Override
+    public void visit(LOSort sort) throws IOException {
+    }
+    
+    @Override
+    public void visit(LOStore store) throws IOException {
+    }
+    
+    @Override
+    public void visit( LOCogroup cg ) throws IOException {
+        addForEachIfNecessary(cg);
+    }
+    
+    @Override
+    public void visit(LOJoin join) throws IOException {
+    }
+    
+    @Override
+    public void visit(LOCross cross) throws IOException {
+    }
+    
+    @Override
+    @SuppressWarnings("unchecked")
+    public void visit(LOForEach foreach) throws IOException {
+        if (!columnPrune) {
+            return;
+        }
+        
+        // get column numbers from input uids
+        Set<Long> inputUids = (Set<Long>)foreach.getAnnotation(ColumnPruneHelper.INPUTUIDS);
+        
+        // Get all top level projects
+        LogicalPlan innerPlan = foreach.getInnerPlan();
+        List<LOInnerLoad> innerLoads= new ArrayList<LOInnerLoad>();
+        List<Operator> sources = innerPlan.getSources();
+        for (Operator s : sources) {
+            if (s instanceof LOInnerLoad)
+                innerLoads.add((LOInnerLoad)s);
+        }
+        
+        // If project of the innerLoad is not in INPUTUIDS, remove this innerLoad
+        Set<LOInnerLoad> innerLoadsToRemove = new HashSet<LOInnerLoad>();
+        for (LOInnerLoad innerLoad: innerLoads) {
+            ProjectExpression project = innerLoad.getProjection();
+            if (project.isProjectStar()) {
+                LogicalSchema.LogicalFieldSchema tupleFS = project.getFieldSchema();
+                // Check the first component of the star projection
+                long uid = tupleFS.schema.getField(0).uid;
+                if (!inputUids.contains(uid))
+                    innerLoadsToRemove.add(innerLoad);
+            }
+            else {
+                if (!inputUids.contains(project.getFieldSchema().uid))
+                    innerLoadsToRemove.add(innerLoad);
+            }
+        }
+        
+        // Find the logical operator immediate precede LOGenerate which should be removed (the whole branch)
+        Set<LogicalRelationalOperator> branchHeadToRemove = new HashSet<LogicalRelationalOperator>();
+        for (LOInnerLoad innerLoad : innerLoadsToRemove) {
+            Operator op = innerLoad;
+            while (!(innerPlan.getSuccessors(op).get(0) instanceof LOGenerate)) {
+                op = innerPlan.getSuccessors(op).get(0);
+            }
+            branchHeadToRemove.add((LogicalRelationalOperator)op);
+        }
+        
+        // Find the expression plan to remove
+        LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
+        List<LogicalExpressionPlan> genPlansToRemove = new ArrayList<LogicalExpressionPlan>();
+        
+        List<LogicalExpressionPlan> genPlans = gen.getOutputPlans();
+        for (int i=0;i<genPlans.size();i++) {
+            LogicalExpressionPlan expPlan = genPlans.get(i);
+            List<Operator> expSources = expPlan.getSinks();
+            
+            for (Operator expSrc : expSources) {
+                if (expSrc instanceof ProjectExpression) {
+                    LogicalRelationalOperator reference = ((ProjectExpression)expSrc).findReferent();
+                    if (branchHeadToRemove.contains(reference)) {
+                        genPlansToRemove.add(expPlan);
+                    }
+                }
+            }
+        }
+        
+        // Build the temporary structure based on genPlansToRemove, which include:
+        // * flattenList
+        // * inputsRemoved
+        //     We first construct inputsNeeded, and inputsRemoved = (all inputs) - inputsNeeded.
+        //     We cannot figure out inputsRemoved directly since the inputs may be used by other output plan.
+        //     We can only get inputsRemoved after visiting all output plans.
+        List<Boolean> flattenList = new ArrayList<Boolean>();
+        Set<Integer> inputsNeeded = new HashSet<Integer>();
+        Set<Integer> inputsRemoved = new HashSet<Integer>();
+        
+        for (int i=0;i<genPlans.size();i++) {
+            LogicalExpressionPlan genPlan = genPlans.get(i);
+            if (!genPlansToRemove.contains(genPlan)) {
+                flattenList.add(gen.getFlattenFlags()[i]);
+                List<Operator> sinks = genPlan.getSinks();
+                for(Operator s: sinks) {
+                    if (s instanceof ProjectExpression) {
+                        inputsNeeded.add(((ProjectExpression)s).getInputNum());
+                    }
+                }
+            }
+        }
+        
+        List<Operator> preds = innerPlan.getPredecessors(gen);
+        for (int i=0;i<preds.size();i++) {
+            if (!inputsNeeded.contains(i))
+                inputsRemoved.add(i);
+        }
+        
+        // Change LOGenerate: remove unneeded output expression plan
+        // change flatten flag
+        boolean[] flatten = new boolean[flattenList.size()];
+        for (int i=0;i<flattenList.size();i++)
+            flatten[i] = flattenList.get(i);
+
+        gen.setFlattenFlags(flatten);
+        
+        for (LogicalExpressionPlan genPlanToRemove : genPlansToRemove) {
+            genPlans.remove(genPlanToRemove);
+        }
+        
+        // shift project input
+        if (!inputsRemoved.isEmpty()) {
+            for (LogicalExpressionPlan genPlan : genPlans) {
+                List<Operator> sinks = genPlan.getSinks();
+                for(Operator s: sinks) {
+                    if (s instanceof ProjectExpression) {
+                        int input = ((ProjectExpression)s).getInputNum();
+                        int numToShift = 0;
+                        for (int i :inputsRemoved) {
+                            if (i<input)
+                                numToShift++;
+                        }
+                        ((ProjectExpression)s).setInputNum(input-numToShift);
+                    }
+                }
+            }
+        }
+        
+        // Prune unneeded LOInnerLoad
+        List<LogicalRelationalOperator> predToRemove = new ArrayList<LogicalRelationalOperator>();
+        for (int i : inputsRemoved) {
+            predToRemove.add((LogicalRelationalOperator)preds.get(i));
+        }
+        for (LogicalRelationalOperator pred : predToRemove) {
+            removeSubTree(pred);
+        }
+    }
+    
+    @Override
+    public void visit(LOUnion union) throws IOException {
+        // AddForEach before union if necessary.
+        List<Operator> preds = new ArrayList<Operator>();
+        preds.addAll(plan.getPredecessors(union));
+        
+        for (Operator pred : preds) {
+            addForEachIfNecessary((LogicalRelationalOperator)pred);
+        }
+    }
+    
+    // remove all the operators starting from an operator
+    private void removeSubTree(LogicalRelationalOperator op) throws IOException {
+        LogicalPlan p = (LogicalPlan)op.getPlan();
+        List<Operator> ll = p.getPredecessors(op);
+        if (ll != null) {
+            for(Operator pred: ll) {
+                removeSubTree((LogicalRelationalOperator)pred);
+            }
+        }
+                
+        if (p.getSuccessors(op) != null) {
+            Operator[] succs = p.getSuccessors(op).toArray(new Operator[0]);            
+            for(Operator s: succs) {
+                p.disconnect(op, s);
+            }
+        }
+        
+        p.remove(op);
+    }
+
+    // Add ForEach after op to prune unnecessary columns
+    @SuppressWarnings("unchecked")
+    private void addForEachIfNecessary(LogicalRelationalOperator op) throws IOException {
+        Set<Long> outputUids = (Set<Long>)op.getAnnotation(ColumnPruneHelper.OUTPUTUIDS);
+        LogicalSchema schema = op.getSchema();
+        Set<Integer> columnsToDrop = new HashSet<Integer>();
+        
+        for (int i=0;i<schema.size();i++) {
+            if (!outputUids.contains(schema.getField(i).uid))
+                columnsToDrop.add(i);
+        }
+        
+        if (!columnsToDrop.isEmpty()) {
+            LOForEach foreach = Util.addForEachAfter((LogicalPlan)op.getPlan(), op, columnsToDrop);
+            foreach.getSchema();
+        }
+    }    
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+/**
+ * This Rule moves Filter Above Foreach.
+ * It checks if uid on which filter works on
+ * is present in the predecessor of foreach.
+ * If so it transforms it.
+ */
+public class FilterAboveForeach extends Rule {
+
+    public FilterAboveForeach(String n) {
+        super(n);
+    }
+
+    @Override
+    protected OperatorPlan buildPattern() {
+        // the pattern that this rule looks for
+        // is foreach -> filter
+        LogicalPlan plan = new LogicalPlan();
+        LogicalRelationalOperator foreach = new LOForEach(plan);
+        LogicalRelationalOperator filter = new LOFilter(plan);
+        
+        plan.add(foreach);
+        plan.add(filter);
+        plan.connect(foreach, filter);
+
+        return plan;
+    }
+
+    @Override
+    public Transformer getNewTransformer() {
+        return new FilterAboveForEachTransformer();
+    }
+    
+    public class FilterAboveForEachTransformer extends Transformer {
+
+        LOFilter filter = null;
+        LOForEach foreach = null;
+        LogicalRelationalOperator forEachPred = null;
+        OperatorSubPlan subPlan = null;
+        
+        @Override
+        public boolean check(OperatorPlan matched) throws IOException {
+            Iterator<Operator> iter = matched.getOperators();
+            while( iter.hasNext() ) {
+                Operator op = iter.next();
+                if( op instanceof LOForEach ) {
+                    foreach = (LOForEach)op;
+                    break;
+                }
+            }
+            
+            // This would be a strange case
+            if( foreach == null ) return false;
+            
+            iter = matched.getOperators();
+            while( iter.hasNext() ) {
+                Operator op = iter.next();
+                if( ( op instanceof LOFilter ) ) {
+                    filter = (LOFilter)op;
+                    break;
+                }
+            }
+            
+            // This is for cheating, we look up more than one filter in the plan
+            while( filter != null ) {
+                // Get uids of Filter
+                Pair<List<Long>, List<Byte>> uidWithTypes = getFilterProjectionUids(filter);
+
+                // See if the previous operators have uids from project
+                List<Operator> preds = currentPlan.getPredecessors(foreach);            
+                for(int j=0; j< preds.size(); j++) {
+                    LogicalRelationalOperator logRelOp = (LogicalRelationalOperator)preds.get(j);
+                    if (hasAll(logRelOp, uidWithTypes)) {
+                        forEachPred = (LogicalRelationalOperator) preds.get(j);
+                        return true;
+                    }
+                }
+                
+                // Chances are there are filters below this filter which can be
+                // moved up. So searching for those filters
+                List<Operator> successors = currentPlan.getSuccessors(filter);
+                if( successors != null && successors.size() > 0 && 
+                        successors.get(0) instanceof LOFilter ) {
+                    filter = (LOFilter)successors.get(0);
+                } else {
+                    filter = null;
+                }
+            }
+            return false;            
+        }
+        
+        /**
+         * Get all uids from Projections of this FilterOperator
+         * @param filter
+         * @return Set of uid
+         */
+        private Pair<List<Long>, List<Byte>> getFilterProjectionUids(LOFilter filter) throws IOException {
+            List<Long> uids = new ArrayList<Long>();
+            List<Byte> types = new ArrayList<Byte>();
+            if( filter != null ) {
+                LogicalExpressionPlan filterPlan = filter.getFilterPlan();
+                Iterator<Operator> iter = filterPlan.getOperators();            
+                Operator op = null;
+                while( iter.hasNext() ) {
+                    op = iter.next();
+                    if( op instanceof ProjectExpression ) {
+                        uids.add(((ProjectExpression)op).getFieldSchema().uid);
+                        types.add(((ProjectExpression)op).getFieldSchema().type);
+                    }
+                }
+            }
+            Pair<List<Long>, List<Byte>> result = new Pair<List<Long>, List<Byte>>(uids, types);
+            return result;
+        }
+        
+        /**
+         * checks if a relational operator contains all of the specified uids
+         * @param op LogicalRelational operator that should contain the uid
+         * @param uids Uids to check for
+         * @return true if given LogicalRelationalOperator has all the given uids
+         */
+        private boolean hasAll(LogicalRelationalOperator op, Pair<List<Long>, 
+                List<Byte>> uidWithTypes) {
+            LogicalSchema schema = op.getSchema();
+            
+            if (schema==null)
+                return false;
+            
+            List<Long> uids = uidWithTypes.first;
+            List<Byte> types = uidWithTypes.second;
+            
+            for (int i=0;i<uids.size();i++) {
+                boolean found = false;
+                for (LogicalSchema.LogicalFieldSchema fs : schema.getFields()) {
+                    if (fs.uid==uids.get(i) && fs.type==types.get(i))
+                        found = true;
+                }
+                if (!found)
+                    return false;
+            }
+            return true;
+        }
+        
+        @Override
+        public OperatorPlan reportChanges() {            
+            return subPlan;
+        }
+
+        @Override
+        public void transform(OperatorPlan matched) throws IOException {
+            
+            List<Operator> opSet = currentPlan.getPredecessors(filter);
+            if( ! ( opSet != null && opSet.size() > 0 ) ) {
+                return;
+            }
+            Operator filterPred = opSet.get(0);
+            
+            opSet = currentPlan.getSuccessors(filter);
+            if( ! ( opSet != null && opSet.size() > 0 ) ) {
+                return;
+            }
+            Operator filterSuc = opSet.get(0);
+            
+            subPlan = new OperatorSubPlan(currentPlan);
+            
+            // Steps below do the following
+            /*
+             *          ForEachPred
+             *               |
+             *            ForEach         
+             *               |
+             *             Filter*
+             *      ( These are filters
+             *      which cannot be moved )
+             *               |
+             *           FilterPred                 
+             *         ( is a Filter )
+             *               |
+             *             Filter
+             *        ( To be moved ) 
+             *               |
+             *            FilterSuc
+             *              
+             *               |
+             *               |
+             *        Transforms into 
+             *               |
+             *              \/            
+             *                      
+             *            ForEachPred
+             *               |
+             *            Filter
+             *     ( After being Moved )
+             *               |
+             *            ForEach
+             *               |
+             *             Filter*
+             *       ( These are filters
+             *      which cannot be moved )
+             *               |
+             *           FilterPred                 
+             *         ( is a Filter )
+             *               |
+             *            FilterSuc
+             *            
+             *  Above plan is assuming we are modifying the filter in middle.
+             *  If we are modifying the first filter after ForEach then
+             *  -- * (kleene star) becomes zero
+             *  -- And ForEach is FilterPred 
+             */
+            
+            Pair<Integer, Integer> forEachPredPlaces = currentPlan.disconnect(forEachPred, foreach);
+            Pair<Integer, Integer> filterPredPlaces = currentPlan.disconnect(filterPred, filter);
+            Pair<Integer, Integer> filterSucPlaces = currentPlan.disconnect(filter, filterSuc);
+            
+            currentPlan.connect(forEachPred, forEachPredPlaces.first, filter, filterPredPlaces.second);
+            currentPlan.connect(filter, filterSucPlaces.first, foreach, forEachPredPlaces.second);
+            currentPlan.connect(filterPred, filterPredPlaces.first, filterSuc, filterSucPlaces.second);
+            
+            subPlan.add(forEachPred);
+            subPlan.add(foreach);
+            subPlan.add(filterPred);
+            subPlan.add(filter);
+            subPlan.add(filterSuc);
+        }
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.ReverseDependencyOrderWalker;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.newplan.logical.expression.MapLookupExpression;
+import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+/**
+ * This filter Marks every Load Operator which has a Map 
+ * with MAP_MARKER_ANNOTATION. The annotation value is 
+ * <code>Map<Integer,Set<String>><code> where Integer is the column number 
+ * of the field and Set is the set of Keys in this field ( field is a map field only ).
+ * 
+ * It does this for only the top level schema in load. 
+ * 
+ * Algorithm:
+ *  Traverse the Plan in ReverseDependency order ( ie. Sink to Source )
+ *      For LogicalRelationalOperators having MapLookupExpression in their 
+ *          expressionPlan collect uid and keys related to it. This is
+ *          retained in the visitor
+ *      For ForEach having nested LogicalPlan use the same visitor hence
+ *          there is no distinction required
+ *      At Sources find all the uids provided by this source and annotate this 
+ *      LogicalRelationalOperator ( load ) with <code>Map<Integer,Set<String>></code>
+ *      containing only the column numbers that this LogicalRelationalOperator generates
+ *      
+ * NOTE: This is a simple Map Pruner. If a map key is mentioned in the script
+ *      then this pruner assumes you need the key. This pruner is not as optimized
+ *      as column pruner ( which removes a column if it is mentioned but never used )
+ *
+ */
+public class MapKeysPruneHelper {
+
+    public static final String REQUIRED_MAPKEYS = "MapPruner:RequiredKeys";
+    
+    private OperatorPlan currentPlan;
+    private OperatorSubPlan subplan;
+    
+    public MapKeysPruneHelper(OperatorPlan currentPlan) {
+        this.currentPlan = currentPlan;
+        
+        if (currentPlan instanceof OperatorSubPlan) {
+            subplan = new OperatorSubPlan(((OperatorSubPlan)currentPlan).getBasePlan());
+        } else {
+            subplan = new OperatorSubPlan(currentPlan);
+        }
+    }
+  
+
+    @SuppressWarnings("unchecked")
+    public boolean check() throws IOException {       
+        
+        // First check if we have a load with a map in it or not
+        List<Operator> sources = currentPlan.getSources();
+        
+        for( Operator source : sources ) {
+            LogicalSchema schema = ((LogicalRelationalOperator)source).getSchema();
+            // If any of the loads has a null schema we dont know the ramifications here
+            // so we skip this optimization
+            if( schema == null ) {
+                return false;
+            }
+        }
+                    
+        // Now we check what keys are needed
+        MapMarker marker = new MapMarker(currentPlan);
+        marker.visit();
+        
+        // Get all Uids from Sinks
+        List<Operator> sinks = currentPlan.getSinks();
+        Set<Long> sinkMapUids = new HashSet<Long>();
+        for( Operator sink : sinks ) {
+            LogicalSchema schema = ((LogicalRelationalOperator)sink).getSchema();
+            sinkMapUids.addAll( getMapUids( schema ) );
+        }
+        
+        
+        // If we have found specific keys which are needed then we return true;
+        // Else if we dont have any specific keys we return false
+        boolean hasAnnotation = false;
+        for( Operator source : sources ) {
+            Map<Integer,Set<String>> annotationValue = 
+                (Map<Integer, Set<String>>) ((LogicalRelationalOperator)source).getAnnotation(REQUIRED_MAPKEYS);
+            
+            // Now for all full maps found in sinks we cannot prune them at source
+            if( ! sinkMapUids.isEmpty() && annotationValue != null && 
+                    !annotationValue.isEmpty() ) {
+                Integer[] annotationKeyArray = annotationValue.keySet().toArray( new Integer[0] );
+                LogicalSchema sourceSchema = ((LogicalRelationalOperator)source).getSchema();
+                for( Integer col : annotationKeyArray ) {                	
+                    if( sinkMapUids.contains(sourceSchema.getField(col).uid)) {
+                        annotationValue.remove( col );
+                    }
+                }
+            }
+            
+            if ( annotationValue != null && annotationValue.isEmpty()) {
+                ((LogicalRelationalOperator)source).removeAnnotation(REQUIRED_MAPKEYS);
+                annotationValue = null;
+            }
+            
+            // Can we still prune any keys
+            if( annotationValue != null ) {
+                hasAnnotation = true;
+                subplan.add(source);
+            }
+        }
+        
+        // If all the sinks dont have any schema, we cant to any optimization
+        return hasAnnotation;
+    }
+    
+    /**
+     * This function checks if the schema has a map.
+     * We dont check for a nested structure.
+     * @param schema Schema to be checked
+     * @return true if it has a map, else false
+     * @throws NullPointerException incase Schema is null
+     */
+    private boolean hasMap(LogicalSchema schema ) throws NullPointerException {
+        for( LogicalFieldSchema field : schema.getFields() ) {
+            if( field.type == DataType.MAP ) {
+                return true;
+            }
+        }
+        return false;
+    }
+    
+    /**
+     * This function returns a set of Uids corresponding to
+     * map datatype in the first level of this schema
+     * @param schema Schema having fields
+     * @return
+     */
+    private Set<Long> getMapUids(LogicalSchema schema ) {
+        Set<Long> uids = new HashSet<Long>();
+        if( schema != null ) {
+            for( LogicalFieldSchema field : schema.getFields() ) {
+                if( field.type == DataType.MAP ) {
+                    uids.add( field.uid );
+                }
+            }
+        }
+        return uids;
+    }
+
+    public OperatorPlan reportChanges() {
+        return subplan;
+    }
+
+      
+    /**
+     * This class collects all the information required to create
+     * the list of keys required for a map
+     */
+    static public class MapMarker extends AllExpressionVisitor {
+        
+        Map<Long,Set<String>> inputUids = null;
+
+        protected MapMarker(OperatorPlan plan) {
+            super(plan, new ReverseDependencyOrderWalker(plan));
+            inputUids = new HashMap<Long,Set<String>>();
+        }
+        
+        @Override
+        public void visit(LOLoad load) throws IOException {
+            if( load.getSchema() != null ) {
+                Map<Integer,Set<String>> annotation = new HashMap<Integer,Set<String>>();
+                for( int i=0; i<load.getSchema().size(); i++) {
+                    LogicalFieldSchema field = load.getSchema().getField(i);
+                    if( inputUids.containsKey( field.uid ) ) {
+                        annotation.put(i, inputUids.get( field.uid ) );
+                    }
+                }
+                load.annotate(REQUIRED_MAPKEYS, annotation);
+            }
+        }
+
+        @Override
+        public void visit(LOFilter filter) throws IOException {
+            currentOp = filter;
+            MapExprMarker v = (MapExprMarker) getVisitor(filter.getFilterPlan());
+            v.visit();
+            mergeUidKeys( v.inputUids );
+        }
+        
+        @Override
+        public void visit(LOJoin join) throws IOException {
+            currentOp = join;
+            Collection<LogicalExpressionPlan> c = join.getExpressionPlans();
+            for (LogicalExpressionPlan plan : c) {
+                MapExprMarker v = (MapExprMarker) getVisitor(plan);
+                v.visit();
+                mergeUidKeys( v.inputUids );
+            }
+        }
+        
+        @Override
+        public void visit(LOGenerate gen) throws IOException {
+            currentOp = gen;
+            Collection<LogicalExpressionPlan> plans = gen.getOutputPlans();
+            for( LogicalExpressionPlan plan : plans ) {
+                MapExprMarker v = (MapExprMarker) getVisitor(plan);
+                v.visit();
+                mergeUidKeys( v.inputUids );
+            }
+        }
+        
+        @Override
+        public void visit(LOSort sort) throws IOException {
+            currentOp = sort;
+            Collection<LogicalExpressionPlan> c = sort.getSortColPlans();
+            for (LogicalExpressionPlan plan : c) {
+                MapExprMarker v = (MapExprMarker) getVisitor(plan);
+                v.visit();
+                mergeUidKeys( v.inputUids );
+            }
+        }
+        
+        private void mergeUidKeys( Map<Long, Set<String> > inputMap ) {
+            for( Map.Entry<Long, Set<String>> entry : inputMap.entrySet() ) {
+                if( inputUids.containsKey(entry.getKey()) ) {
+                    Set<String> mapKeySet = inputUids.get(entry.getKey());
+                    mapKeySet.addAll(entry.getValue());
+                } else {
+                    inputUids.put(entry.getKey(), inputMap.get(entry.getKey()));
+                }
+            }
+        }
+
+        @Override
+        protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr) {
+            return new MapExprMarker(expr );
+        }
+        
+        static class MapExprMarker extends LogicalExpressionVisitor {
+
+            Map<Long,Set<String>> inputUids = null;
+            
+            protected MapExprMarker(OperatorPlan p) {
+                super(p, new DependencyOrderWalker(p));
+                inputUids = new HashMap<Long,Set<String>>();
+            }
+
+            @Override
+            public void visit(MapLookupExpression op) throws IOException {
+                Long uid = op.getMap().getFieldSchema().uid;
+                String key = op.getLookupKey();
+                
+                HashSet<String> mapKeySet = null;
+                if( inputUids.containsKey(uid) ) {
+                    mapKeySet = (HashSet<String>) inputUids.get(uid);                                        
+                } else {
+                    mapKeySet = new HashSet<String>();
+                    inputUids.put(uid, mapKeySet);
+                }
+                mapKeySet.add(key);
+            }
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.logical.expression.AndExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+public class MergeFilter extends Rule {
+
+    public MergeFilter(String n) {
+        super(n);       
+    }
+
+    @Override
+    public Transformer getNewTransformer() {        
+        return new MergeFilterTransformer();
+    }
+
+    public class MergeFilterTransformer extends Transformer {
+
+        private OperatorSubPlan subPlan;
+
+        @Override
+        public boolean check(OperatorPlan matched) throws IOException {           
+            LOFilter filter = (LOFilter)matched.getSources().get(0);
+            List<Operator> succeds = currentPlan.getSuccessors(filter);
+            // if this filter is followed by another filter, we should combine them
+            if (succeds != null && succeds.size() == 1) {
+                if (succeds.get(0) instanceof LOFilter) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        @Override
+        public void transform(OperatorPlan matched) throws IOException {     
+            subPlan = new OperatorSubPlan(currentPlan);
+            
+            LOFilter filter = (LOFilter)matched.getSources().get(0);
+
+            subPlan.add(filter);
+            
+            List<Operator> succeds = currentPlan.getSuccessors(filter);
+            if (succeds != null && succeds.size()== 1 && (succeds.get(0) instanceof LOFilter)) {
+                LOFilter next = (LOFilter)succeds.get(0);
+                combineFilterCond(filter, next);
+                Pair<Integer, Integer> p1 = currentPlan.disconnect(filter, next);
+                List<Operator> ll = currentPlan.getSuccessors(next);
+                if (ll!= null && ll.size()>0) {
+                    Operator op = ll.get(0);
+                    Pair<Integer, Integer> p2 = currentPlan.disconnect(next, op);
+                    currentPlan.connect(filter, p1.first, op, p2.second);
+                    subPlan.add(op);
+                }
+                
+                currentPlan.remove(next);
+            }
+            
+            Iterator<Operator> iter = filter.getFilterPlan().getOperators();
+            while (iter.hasNext()) {
+                Operator oper = iter.next();
+                if (oper instanceof ProjectExpression) {
+                    ((ProjectExpression)oper).setAttachedRelationalOp(filter);
+                }
+            }
+        }        
+        
+        @Override
+        public OperatorPlan reportChanges() {          
+            return subPlan;
+        }
+        
+        // combine the condition of two filters. The condition of second filter
+        // is added into the condition of first filter with an AND operator.
+        private void combineFilterCond(LOFilter f1, LOFilter f2) throws IOException {
+            LogicalExpressionPlan p1 = f1.getFilterPlan();
+            LogicalExpressionPlan p2 = f2.getFilterPlan();
+            LogicalExpressionPlan andPlan = new LogicalExpressionPlan();
+            
+            // add existing operators          
+            Iterator<Operator> iter = p1.getOperators();
+            while(iter.hasNext()) {
+                andPlan.add(iter.next());
+            }
+            
+            iter = p2.getOperators();
+            while(iter.hasNext()) {
+                andPlan.add(iter.next());
+            }
+            
+            // add all connections
+            iter = p1.getOperators();
+            while(iter.hasNext()) {
+                Operator n = iter.next();
+                List<Operator> l = p1.getPredecessors(n);
+                if (l != null) {
+                    for(Operator op: l) {
+                        andPlan.connect(op, n);
+                    }
+                }
+            }
+            
+            iter = p2.getOperators();
+            while(iter.hasNext()) {
+                Operator n = iter.next();
+                List<Operator> l = p2.getPredecessors(n);
+                if (l != null) {
+                    for(Operator op: l) {
+                        andPlan.connect(op, n);
+                    }
+                }
+            }          
+            
+            // create an AND
+            new AndExpression(andPlan, (LogicalExpression)p1.getSources().get(0), (LogicalExpression)p2.getSources().get(0));          
+            
+            f1.setFilterPlan(andPlan);
+        }
+
+    }
+
+    @Override
+    protected OperatorPlan buildPattern() {        
+        // the pattern that this rule looks for
+        // is filter operator
+        LogicalPlan plan = new LogicalPlan();
+        LogicalRelationalOperator op = new LOFilter(plan);
+        plan.add(op);        
+        
+        return plan;
+    }
+}
+