You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/01/04 23:30:58 UTC

svn commit: r895805 - in /hadoop/pig/branches/load-store-redesign: src/org/apache/pig/ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/optimizer/ test/org/apache/pig/test/

Author: pradeepkth
Date: Mon Jan  4 22:30:57 2010
New Revision: 895805

URL: http://svn.apache.org/viewvc?rev=895805&view=rev
Log:
PIG-1090: Update sources to reflect recent changes in load-store interfaces - changes to implement interactions with LoadMetadata interface from pig runtime code -committing files missed last time (pradeepkth)

Added:
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Expression.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/PColFilterExtractor.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPartitionFilterOptimization.java

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Expression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Expression.java?rev=895805&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Expression.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Expression.java Mon Jan  4 22:30:57 2010
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+
+/**
+ * A class to communicate Filter expressions to LoadFuncs
+ */
+public abstract class Expression {
+
+ // Operator type                                                                                                                                                                                                                                                                                       
+    public static  enum OpType {
+        
+        // binary arith ops
+        OP_PLUS (" + "),
+        OP_MINUS(" - "),
+        OP_TIMES(" * "),
+        OP_DIV(" / "),
+        OP_MOD(" % "),
+          
+        //binary ops
+        OP_EQ(" == "),
+        OP_NE(" != "),
+        OP_GT(" > "),
+        OP_GE(" >= "),
+        OP_LT(" < "),
+        OP_LE(" <= "),
+
+        //binary logical
+        OP_AND(" and "),
+        OP_OR(" or "),
+        TERM_COL(" Column "),
+        TERM_CONST(" Constant ");
+        
+        private String str = "";
+        private OpType(String rep){
+            this.str = rep;
+        }
+        private OpType(){
+        }
+        
+        @Override
+        public String toString(){
+            return this.str;
+        }
+        
+    }
+    
+    protected OpType opType;
+    
+    /**
+     * @return the opType
+     */
+    public OpType getOpType() {
+        return opType;
+    }
+
+    
+    
+    
+    public static class BinaryExpression extends Expression {
+        
+        /**
+         * left hand operand
+         */
+        Expression lhs;
+        
+        /**
+         * right hand operand
+         */
+        Expression rhs;
+    
+        
+        /**
+         * @param lhs
+         * @param rhs
+         */
+        public BinaryExpression(Expression lhs, Expression rhs, OpType opType) {
+            this.opType = opType;
+            this.lhs = lhs;
+            this.rhs = rhs;
+        }
+    
+        /**
+         * @return the left hand operand
+         */
+        public Expression getLhs() {
+            return lhs;
+        }
+    
+        /**
+         * @return the right hand operand
+         */
+        public Expression getRhs() {
+            return rhs;
+        }
+        
+        @Override
+        public String toString() {
+            return "(" + lhs.toString() + opType.toString() + rhs.toString()
+                                + ")";
+        }
+    }
+    
+    public static class Column extends Expression {
+        
+        /**
+         * name of column
+         */
+        private String name;
+    
+        /**
+         * @param name
+         */
+        public Column(String name) {
+            this.opType = OpType.TERM_COL;
+            this.name = name;
+        }
+        
+        @Override
+        public String toString() {
+            return name;
+        }
+
+        /**
+         * @return the name
+         */
+        public String getName() {
+            return name;
+        }
+
+        /**
+         * @param name the name to set
+         */
+        public void setName(String name) {
+            this.name = name;
+        }
+    }
+    
+    public static class Const extends Expression {
+        
+        /**
+         * value of the constant
+         */
+        Object value;
+    
+        /**
+         * @return the value
+         */
+        public Object getValue() {
+            return value;
+        }
+    
+        /**
+         * @param value
+         */
+        public Const(Object value) {
+            this.opType = OpType.TERM_CONST;
+            this.value = value;
+        }
+        
+        @Override
+        public String toString() {
+            return (value instanceof String) ? "\'" + value + "\'": 
+                value.toString();
+        }
+    }
+
+}
+
+

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/PColFilterExtractor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/PColFilterExtractor.java?rev=895805&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/PColFilterExtractor.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/PColFilterExtractor.java Mon Jan  4 22:30:57 2010
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.Expression;
+import org.apache.pig.PigException;
+import org.apache.pig.Expression.BinaryExpression;
+import org.apache.pig.Expression.OpType;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * This Visitor works on the filter condition of a LOFilter which immediately 
+ * follows a LOLoad that interacts with a metadata system (currently OWL) to 
+ * read table data. The visitor looks for conditions on partition columns in the
+ * filter condition and extracts those conditions out of the filter condition.
+ * The condition on partition cols will be used to prune partitions of the table.
+ *
+ */
+public class PColFilterExtractor extends LOVisitor {
+
+    /**
+     * partition columns associated with the table
+     * present in the load on which the filter whose
+     * inner plan is being visited is applied
+     */
+    private List<String> partitionCols;
+    
+    /**
+     * will contain the partition column filter conditions
+     * accumulated during the visit - the final condition will an expression
+     * built from these sub expressions connected with AND
+     */
+    private ArrayList<Expression> pColConditions = new ArrayList<Expression>();
+    
+    /**
+     * flag used during visit to indicate if a partition key
+     * was seen
+     */
+    private boolean sawKey;
+    
+    private boolean sawNonKeyCol;
+    
+    private enum Side { LEFT, RIGHT, NONE };
+    private Side replaceSide = Side.NONE;
+    
+    private boolean filterRemovable = false;
+    
+    @Override
+    public void visit() throws VisitorException {
+        // we will visit the leaf and it will recursively walk the plan
+        try {
+            ExpressionOperator leaf = (ExpressionOperator)mPlan.getLeaves().get(0);
+            // if the leaf is a unary operator it should be a FilterFunc in 
+            // which case we don't try to extract partition filter conditions
+            if(leaf instanceof BinaryExpressionOperator) {
+                visit((BinaryExpressionOperator)leaf);
+                replaceChild(leaf);
+                // if the entire expression is to be removed, then the above
+                // replaceChild will not set sawKey to false (sawKey is set to
+                // false only in replaceChild()
+                if(sawKey == true) {
+                    //there are only conditions on partition columns in the filter
+                    //extract it
+                    pColConditions.add(getExpression(leaf));
+                    filterRemovable = true;
+                }
+            }
+        } catch (FrontendException e) {
+            throw new VisitorException(e);
+        }
+        
+    }
+    
+    /**
+     * 
+     * @param plan logical plan corresponding the filter's comparison condition
+     * @param partitionCols list of partition columns of the table which is
+     * being loaded in the LOAD statement which is input to the filter
+     */
+    public PColFilterExtractor(LogicalPlan plan,
+            List<String> partitionCols) {
+        // though we configure a DepthFirstWalker to be the walker, we will not
+        // use it - we will visit the leaf and it will recursively walk the
+        // plan
+        super(plan, new DepthFirstWalker<LogicalOperator, 
+                LogicalPlan>(plan));
+        this.partitionCols = new ArrayList<String>(partitionCols);
+    }
+    
+    @Override
+    protected void visit(LOProject project) throws VisitorException {
+        try {
+            String fieldName = project.getFieldSchema().alias;
+            if(partitionCols.contains(fieldName)) {
+                sawKey = true;
+                // The condition on partition column will be used to prune the
+                // scan and removed from the filter condition. Hence the condition 
+                // on the partition column will not be re applied when data is read,
+                // so the following cases should throw error until that changes.
+                List<Class<?>> opsToCheckFor = new 
+                ArrayList<Class<?>>();
+                opsToCheckFor.add(LORegexp.class);
+                int errCode = 1110;
+                if(checkSuccessors(project, opsToCheckFor)) {
+                    throw new FrontendException("Unsupported query: " +
+                            "You have an partition column (" 
+                            + fieldName + ") inside a regexp operator in the " +
+                            		"filter condition.", errCode, PigException.INPUT);
+                } 
+                opsToCheckFor.set(0, LOUserFunc.class);
+                if(checkSuccessors(project, opsToCheckFor)) {
+                    throw new FrontendException("Unsupported query: " +
+                            "You have an partition column (" 
+                            + fieldName + ") inside a function in the " +
+                                    "filter condition.", errCode, PigException.INPUT);
+                }
+                opsToCheckFor.set(0, LOCast.class);
+                if(checkSuccessors(project, opsToCheckFor)) {
+                    throw new FrontendException("Unsupported query: " +
+                            "You have an partition column (" 
+                            + fieldName + ") inside a cast in the " +
+                                    "filter condition.", errCode, PigException.INPUT);                }
+                
+                opsToCheckFor.set(0, LOIsNull.class);
+                if(checkSuccessors(project, opsToCheckFor)) {
+                    throw new FrontendException("Unsupported query: " +
+                            "You have an partition column (" 
+                            + fieldName + ") inside a null check operator in the " +
+                                    "filter condition.", errCode, PigException.INPUT);                }
+                opsToCheckFor.set(0, LOBinCond.class);
+                if(checkSuccessors(project, opsToCheckFor)) {
+                    throw new FrontendException("Unsupported query: " +
+                            "You have an partition column (" 
+                            + fieldName + ") inside a bincond operator in the " +
+                                    "filter condition.", errCode, PigException.INPUT);
+                }
+                opsToCheckFor.set(0, LOAnd.class);
+                opsToCheckFor.add(LOOr.class);
+                if(checkSuccessors(project, opsToCheckFor)) {
+                    errCode = 1112;
+                    throw new FrontendException("Unsupported query: " +
+                            "You have an partition column (" + fieldName +
+                            " ) in a construction like: " +
+                            "(pcond  and ...) or (pcond and ...) " +
+                            "where pcond is a condition on a partition column.",
+                            errCode, PigException.INPUT);
+                }
+            } else {
+                sawNonKeyCol = true;
+            }
+        } catch (FrontendException e) {
+            throw new VisitorException(e);
+        }
+    }
+    
+    @Override
+    protected void visit(BinaryExpressionOperator binOp)
+            throws VisitorException {
+
+        try {
+            boolean lhsSawKey = false;        
+            boolean rhsSawKey = false;        
+            boolean lhsSawNonKeyCol = false;        
+            boolean rhsSawNonKeyCol = false;        
+            
+            sawKey = false;
+            sawNonKeyCol = false;
+            binOp.getLhsOperand().visit(this);
+            replaceChild(binOp.getLhsOperand());
+            lhsSawKey = sawKey;
+            lhsSawNonKeyCol = sawNonKeyCol;
+            
+
+            sawKey = false;
+            sawNonKeyCol = false;
+            binOp.getRhsOperand().visit(this);
+            replaceChild(binOp.getRhsOperand());
+            rhsSawKey = sawKey;
+            rhsSawNonKeyCol = sawNonKeyCol;
+            
+            // only in the case of an AND, we potentially split the AND to 
+            // remove conditions on partition columns out of the AND. For this 
+            // we set replaceSide accordingly so that when we reach a predecessor
+            // we can trim the appropriate side. If both sides of the AND have 
+            // conditions on partition columns, we will remove the AND completely - 
+            // in this case, we will not set replaceSide, but sawKey will be 
+            // true so that as we go to higher predecessor ANDs we can trim later.
+            if(binOp instanceof LOAnd) {
+                if(lhsSawKey && rhsSawNonKeyCol){
+                    replaceSide = Side.LEFT;
+                }else if(rhsSawKey && lhsSawNonKeyCol){
+                    replaceSide = Side.RIGHT;
+                }
+            } else if(lhsSawKey && rhsSawNonKeyCol || rhsSawKey && lhsSawNonKeyCol){
+                int errCode = 1111;
+                String errMsg = "Use of partition column/condition with" +
+                " non partition column/condition in filter expression is not " +
+                "supported." ;
+                throw new FrontendException(errMsg, errCode, PigException.INPUT);
+            }
+
+            sawKey = lhsSawKey || rhsSawKey;
+            sawNonKeyCol = lhsSawNonKeyCol || rhsSawNonKeyCol;
+        } catch (FrontendException e) {
+            throw new VisitorException(e);
+        }        
+    }
+    
+    
+    
+    /**
+     * @return the condition on partition columns extracted from filter
+     */
+    public  Expression getPColCondition(){
+        if(pColConditions.size() == 0)
+            return null;
+        Expression cond =  pColConditions.get(0);
+        for(int i=1; i<pColConditions.size(); i++){
+            //if there is more than one condition expression
+            // connect them using "AND"s
+            cond = new BinaryExpression(cond, pColConditions.get(i),
+                    OpType.OP_AND);
+        }
+        return cond;
+    }
+
+    /**
+     * @return the filterRemovable
+     */
+    public boolean isFilterRemovable() {
+        return filterRemovable;
+    }
+    
+    //////// helper methods /////////////////////////
+    /**
+     * check for the presence of a certain operator type in the Successors
+     * @param opToStartFrom
+     * @param opsToCheckFor operators to be checked for at each level of 
+     * Successors - the ordering in the list is the order in which the ops 
+     * will be checked.
+     * @return true if opsToCheckFor are found
+     * @throws FrontendException 
+     */
+    private boolean checkSuccessors(LogicalOperator opToStartFrom, 
+            List<Class<?>> opsToCheckFor) throws FrontendException {
+        boolean done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
+        if(!done && !opsToCheckFor.isEmpty()) {
+            // continue checking if there is more to check
+            while(!done) {
+                opToStartFrom = mPlan.getSuccessors(opToStartFrom).get(0);
+                done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
+            }
+        }
+        return opsToCheckFor.isEmpty();
+    }
+
+    private boolean checkSuccessorsHelper(LogicalOperator opToStartFrom, 
+            List<Class<?>> opsToCheckFor) throws FrontendException {
+        List<LogicalOperator> successors = mPlan.getSuccessors(
+                opToStartFrom);
+        if(successors == null || successors.size() == 0) {
+            return true; // further checking cannot be done
+        }
+        if(successors.size() == 1) {
+            LogicalOperator suc  = successors.get(0);
+            if(suc.getClass().getCanonicalName().equals(
+                    opsToCheckFor.get(0).getCanonicalName())) {
+                // trim the list of operators to check
+                opsToCheckFor.remove(0);
+                if(opsToCheckFor.isEmpty()) {
+                    return true; //no further checks required
+                }
+            }
+        } else {
+            throwException();
+        }
+        return false; // more checking can be done
+    }
+    
+    private void replaceChild(ExpressionOperator childExpr) throws 
+    FrontendException {
+        
+        if(replaceSide == Side.NONE) {
+            // the child is trimmed when the appropriate
+            // flag is set to indicate that it needs to be trimmed.
+            return;
+        }
+        
+        // eg if replaceSide == Side.LEFT
+        //    binexpop
+        //   /   \ \ 
+        // child (this is the childExpr argument send in)
+        //  /  \
+        // Lt   Rt 
+        //
+        // gets converted to 
+        //  binexpop
+        //  /
+        // Rt
+        
+        if(! (childExpr instanceof BinaryExpressionOperator)){
+            throwException();
+        }
+        // child's lhs operand
+        ExpressionOperator childLhs = 
+            ((BinaryExpressionOperator)childExpr).getLhsOperand();
+        // child's rhs operand
+        ExpressionOperator childRhs = 
+            ((BinaryExpressionOperator)childExpr).getRhsOperand();
+        
+        mPlan.disconnect(childLhs, childExpr);
+        mPlan.disconnect(childRhs, childExpr);
+        
+        if(replaceSide == Side.LEFT) {
+            // remove left child and replace childExpr with its right child
+            remove(childLhs);
+            mPlan.replace(childExpr, childRhs);
+        } else if(replaceSide == Side.RIGHT){
+            // remove right child and replace childExpr with its left child
+            remove(childRhs);
+            mPlan.replace(childExpr, childLhs);
+        }else {
+            throwException();
+        }
+        //reset 
+        replaceSide = Side.NONE;
+        sawKey = false;
+
+    }
+    
+    /**
+     * @param op
+     * @throws FrontendException 
+     */
+    private void remove(ExpressionOperator op) throws FrontendException {
+        pColConditions.add(getExpression(op));
+        mPlan.trimAbove(op);
+        mPlan.remove(op);
+    }
+
+    public static Expression getExpression(ExpressionOperator op) throws 
+    FrontendException {
+        if(op instanceof LOConst) {
+            return new Expression.Const(((LOConst)op).getValue());
+        } else if (op instanceof LOProject) {
+            String fieldName = ((LOProject)op).getFieldSchema().alias;
+            return new Expression.Column(fieldName);
+        } else {
+            if(!(op instanceof BinaryExpressionOperator)) {
+                throwException();
+            }
+            BinaryExpressionOperator binOp = (BinaryExpressionOperator)op;
+            if(binOp instanceof LOAdd) {
+                return getExpression(binOp, OpType.OP_PLUS);
+            } else if(binOp instanceof LOSubtract) {
+                return getExpression(binOp, OpType.OP_MINUS);
+            } else if(binOp instanceof LOMultiply) {
+                return getExpression(binOp, OpType.OP_TIMES);
+            } else if(binOp instanceof LODivide) {
+                return getExpression(binOp, OpType.OP_DIV);
+            } else if(binOp instanceof LOMod) {
+                return getExpression(binOp, OpType.OP_MOD);
+            } else if(binOp instanceof LOAnd) {
+                return getExpression(binOp, OpType.OP_AND);
+            } else if(binOp instanceof LOOr) {
+                return getExpression(binOp, OpType.OP_OR);
+            } else if(binOp instanceof LOEqual) {
+                return getExpression(binOp, OpType.OP_EQ);
+            } else if(binOp instanceof LONotEqual) {
+                return getExpression(binOp, OpType.OP_NE);
+            } else if(binOp instanceof LOGreaterThan) {
+                return getExpression(binOp, OpType.OP_GT);
+            } else if(binOp instanceof LOGreaterThanEqual) {
+                return getExpression(binOp, OpType.OP_GE);
+            } else if(binOp instanceof LOLesserThan) {
+                return getExpression(binOp, OpType.OP_LT);
+            } else if(binOp instanceof LOLesserThanEqual) {
+                return getExpression(binOp, OpType.OP_LE);
+            } else {
+                throwException();
+            }
+        }
+        return null;
+    }
+    
+    private static Expression getExpression(BinaryExpressionOperator binOp, OpType 
+            opType) throws FrontendException {
+        return new BinaryExpression(getExpression(binOp.getLhsOperand())
+                ,getExpression(binOp.getRhsOperand()), opType);
+    }
+
+    public static void throwException() throws FrontendException {
+        int errCode = 2209;
+        throw new FrontendException(
+                "Internal error while processing any partition filter " +
+                "conditions in the filter after the load" ,
+                errCode,
+                PigException.BUG
+        );
+    }
+    
+    // unfortunately LOVisitor today has each visit() method separately defined
+    // so just implementing visit(BinaryExpressionOperator) will not result in 
+    // that method being call when LOAdd (say) is encountered (sigh! - we should
+    // fix that at some point) - for now, let's define visit() on each specific 
+    // BinaryExpressionOperator that we want to visit to inturn call the
+    // visit(BinaryExpressionOperator) method
+    @Override
+    public void visit(LOAdd op) throws VisitorException {
+        visit((BinaryExpressionOperator)op);
+    }
+    @Override
+    public void visit(LOAnd op) throws VisitorException {
+        visit((BinaryExpressionOperator)op);
+    }
+    @Override
+    public void visit(LODivide op) throws VisitorException {
+        visit((BinaryExpressionOperator)op);
+    }
+    @Override
+    public void visit(LOEqual op) throws VisitorException {
+        visit((BinaryExpressionOperator)op);
+    }
+    @Override
+    public void visit(LOGreaterThan op) throws VisitorException {
+        visit((BinaryExpressionOperator)op);
+    }
+    @Override
+    public void visit(LOGreaterThanEqual op) throws VisitorException {
+        visit((BinaryExpressionOperator)op);
+    }
+  
+    @Override
+    public void visit(LOLesserThan op) throws VisitorException {
+        visit((BinaryExpressionOperator)op);
+    }
+    @Override
+    public void visit(LOLesserThanEqual op) throws VisitorException {
+        visit((BinaryExpressionOperator)op);
+    }
+    @Override
+    public void visit(LOMod op) throws VisitorException {
+        visit((BinaryExpressionOperator)op);
+    }
+    @Override
+    public void visit(LOMultiply op) throws VisitorException {
+        visit((BinaryExpressionOperator)op);
+    }
+    @Override
+    public void visit(LONotEqual op) throws VisitorException {
+        visit((BinaryExpressionOperator)op);
+    }
+    @Override
+    public void visit(LOOr op) throws VisitorException {
+        visit((BinaryExpressionOperator)op);
+    }
+    @Override
+    public void visit(LOSubtract op) throws VisitorException {
+        visit((BinaryExpressionOperator)op);
+    }
+    
+    // this might get called from some visit() - in that case, delegate to
+    // the other visit()s which we have defined here 
+    @Override
+    protected void visit(ExpressionOperator op) throws VisitorException {
+        if(op instanceof LOProject) {
+            visit((LOProject)op);
+        } else if (op instanceof BinaryExpressionOperator) {
+            visit((BinaryExpressionOperator)op);
+        } else if (op instanceof LOCast) {
+            visit((LOCast)op);
+        } else if (op instanceof LOBinCond) {
+            visit((LOBinCond)op);
+        } else if (op instanceof LOUserFunc) {
+            visit((LOUserFunc)op);
+        } else if (op instanceof LOIsNull) {
+            visit((LOIsNull)op);
+        }
+        
+    }
+    
+    // some specific operators which are of interest to catch some
+    // unsupported scenarios
+    @Override
+    protected void visit(LOCast cast) throws VisitorException {
+        visit(cast.getExpression());
+    }
+    
+    @Override
+    public void visit(LONot not) throws VisitorException {
+        visit(not.getOperand());   
+    }
+    
+    @Override
+    protected void visit(LORegexp regexp) throws VisitorException {
+        visit((BinaryExpressionOperator)regexp);    
+    }
+    
+    @Override
+    protected void visit(LOBinCond binCond) throws VisitorException {
+        visit(binCond.getCond());
+        visit(binCond.getLhsOp());
+        visit(binCond.getRhsOp());
+    }
+    
+    @Override
+    protected void visit(LOUserFunc udf) throws VisitorException {
+        for (ExpressionOperator op : udf.getArguments()) {
+            visit(op);
+        }
+    }
+    
+    @Override
+    public void visit(LOIsNull isNull) throws VisitorException {
+        visit(isNull.getOperand());
+    }
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java?rev=895805&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java Mon Jan  4 22:30:57 2010
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer.optimizer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.PigException;
+import org.apache.pig.Expression.BinaryExpression;
+import org.apache.pig.Expression.Column;
+import org.apache.pig.Expression.OpType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOFilter;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.PColFilterExtractor;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+
+/**
+ * When the load statement in a pig script is loading a table from a meta data
+ * system (like owl), the load can be followed by a filter which can contain
+ * conditions on partition columns. This filter can also contain conditions on
+ * non partition columns. This optimizer looks at the logical plan and checks if
+ * there is a load followed by such a filter which has conditions on partition
+ * columns. If so, it extracts the conditions on partition columns out of the
+ * filter.
+ */
+public class PartitionFilterOptimizer extends
+        LogicalTransformer {
+    
+    private String[] partitionKeys;
+    
+    /**
+     * a reference to the LoadMetada implementation 
+     */
+    private LoadMetadata loadMetadata;
+
+    /**
+     * a reference to the LoadFunc implementation
+     */
+    private LoadFunc loadFunc;
+    
+    private LOLoad loLoad;
+    private LOFilter loFilter;
+    
+    /**
+     * flag to ensure we only do the optimization once for performance reasons
+     */
+    private boolean alreadyCalled = false;
+    
+    /**
+     * a map between column names as reported in 
+     * {@link LoadMetadata#getSchema(String, org.apache.hadoop.conf.Configuration)}
+     * and as present in {@link LOLoad#getSchema()}. The two will be different 
+     * when the user has provided a schema in the load statement
+     */
+    private Map<String, String> colNameMap = new HashMap<String, String>();
+    
+    /**
+     * a map between column nameas as present in {@link LOLoad#getSchema()} and
+     * as reported in 
+     * {@link LoadMetadata#getSchema(String, org.apache.hadoop.conf.Configuration)}.
+     * The two will be different when the user has provided a schema in the 
+     * load statement.
+     */
+    private Map<String, String> reverseColNameMap = new HashMap<String, String>();
+    
+
+    protected PartitionFilterOptimizer(LogicalPlan plan) {
+        super(plan);
+    }
+
+    @Override
+    public boolean check(List<LogicalOperator> nodes) throws OptimizerException 
+    {
+        if(!alreadyCalled) {
+            // first call
+            alreadyCalled = true;
+        } else {
+            // already called, just return
+            return false; 
+        }
+        if((nodes == null) || (nodes.size() <= 0)) {
+            int errCode = 2052;
+            String msg = "Internal error. Cannot retrieve operator from null " +
+            		"or empty list.";
+            throw new OptimizerException(msg, errCode, PigException.BUG);
+        }
+        if(nodes.size() != 1|| !(nodes.get(0) instanceof LOLoad)) {
+            return false;
+        }
+        loLoad = (LOLoad)nodes.get(0);
+        List<LogicalOperator> sucs = mPlan.getSuccessors(loLoad);
+        if(sucs == null || sucs.size() != 1 || !(sucs.get(0) instanceof LOFilter)) {
+            return false;
+        }
+        loFilter = (LOFilter)sucs.get(0);
+        
+        // we have to check more only if LoadFunc implements LoadMetada
+        loadFunc = loLoad.getLoadFunc();
+        if(!(loadFunc instanceof LoadMetadata)) {
+            return false;
+        }
+        loadMetadata = (LoadMetadata)loadFunc;
+        try {
+            partitionKeys = loadMetadata.getPartitionKeys(
+                    loLoad.getInputFile().getFileName(), loLoad.getConfiguration());
+            if(partitionKeys == null || partitionKeys.length == 0) {
+                return false;
+            }
+        } catch (IOException e) {
+            int errCode = 2209;
+            throw new OptimizerException(
+                    "Internal error while processing any partition filter " +
+                    "conditions in the filter after the load" ,
+                    errCode,
+                    PigException.BUG
+            );
+        }
+        
+        // we found a load-filter pattern where the load returns partition keys
+        return true;
+    }
+
+    @Override
+    public void transform(List<LogicalOperator> nodes)
+            throws OptimizerException {
+        try {
+            setupColNameMaps();
+            PColFilterExtractor pColFilterFinder = new PColFilterExtractor(
+                    loFilter.getComparisonPlan(), getMappedKeys(partitionKeys));
+            pColFilterFinder.visit();
+            Expression partitionFilter = pColFilterFinder.getPColCondition();
+            if(partitionFilter != null) {
+                // the column names in the filter may be the ones provided by
+                // the user in the schema in the load statement - we may need
+                // to replace them with partition column names as given by
+                // LoadFunc.getSchema()
+                updateMappedColNames(partitionFilter);
+                loadMetadata.setPartitionFilter(partitionFilter);
+                if(pColFilterFinder.isFilterRemovable()) {
+                    // remove this filter from the plan
+                    mPlan.removeAndReconnect(loFilter);
+                }
+            }
+        } catch (Exception e) {
+            int errCode = 2209;
+            throw new OptimizerException(
+                    "Internal error while processing any partition filter " +
+                    "conditions in the filter after the load:" ,
+                    errCode,
+                    PigException.BUG,
+                    e
+            );
+        }
+    }
+    
+    
+
+    /**
+     * @param expr
+     */
+    private void updateMappedColNames(Expression expr) {
+        if(expr instanceof BinaryExpression) {
+            updateMappedColNames(((BinaryExpression) expr).getLhs());
+            updateMappedColNames(((BinaryExpression) expr).getRhs());
+        } else if (expr instanceof Column) {
+            Column col = (Column) expr;
+            col.setName(reverseColNameMap.get(col.getName()));
+        }
+    }
+
+    /**
+     * The partition keys in the argument are as reported by 
+     * {@link LoadMetadata#getPartitionKeys(String, org.apache.hadoop.conf.Configuration)}.
+     * The user may have renamed these by providing a schema with different names
+     * in the load statement - this method will replace the former names with
+     * the latter names.
+     * @param partitionKeys
+     * @return
+     */
+    private List<String> getMappedKeys(String[] partitionKeys) {
+        List<String> mappedKeys = new ArrayList<String>(partitionKeys.length);
+        for (int i = 0; i < partitionKeys.length; i++) {
+            mappedKeys.add(colNameMap.get(partitionKeys[i]));
+        }
+        return mappedKeys;
+    }
+
+    
+    
+    /**
+     * @throws FrontendException 
+     * 
+     */
+    private void setupColNameMaps() throws FrontendException {
+        Schema loadFuncSchema = loLoad.getDeterminedSchema();
+        Schema loLoadSchema = loLoad.getSchema();
+        for(int i = 0; i < loadFuncSchema.size(); i++) {
+            colNameMap.put(loadFuncSchema.getField(i).alias, 
+                    (i < loLoadSchema.size() ? loLoadSchema.getField(i).alias :
+                        loadFuncSchema.getField(i).alias));
+            
+            reverseColNameMap.put((i < loLoadSchema.size() ? loLoadSchema.getField(i).alias :
+                        loadFuncSchema.getField(i).alias), 
+                        loadFuncSchema.getField(i).alias);
+        }
+    }
+
+}

Added: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPartitionFilterOptimization.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPartitionFilterOptimization.java?rev=895805&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPartitionFilterOptimization.java (added)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPartitionFilterOptimization.java Mon Jan  4 22:30:57 2010
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.ExecType;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.PigServer;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.ExpressionOperator;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOFilter;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.PColFilterExtractor;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.test.utils.LogicalPlanTester;
+import org.junit.Test;
+
+/**
+ * unit tests to test extracting partition filter conditions out of the filter
+ * condition in the filter following a load which talks to metadata system (.i.e.
+ * implements {@link LoadMetadata})
+ */
+public class TestPartitionFilterOptimization extends TestCase {
+
+    LogicalPlanTester lpTester;
+    
+    @Override
+    protected void setUp() throws Exception {
+        lpTester = new LogicalPlanTester();
+        lpTester.buildPlan("a = load 'foo' as (srcid, mrkt, dstid, name, age);");
+    }
+
+    /**
+     * test case where there is a single expression on partition columns in 
+     * the filter expression along with an expression on non partition column
+     * @throws FrontendException 
+     */
+    @Test
+    public void testSimpleMixed() throws FrontendException {
+        LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by srcid == 10 and name == 'foo';");
+        test(lp, Arrays.asList("srcid"), "(srcid == 10)", "(name == 'foo')");
+    }
+    
+    /**
+     * test case where filter does not contain any condition on partition cols
+     * @throws Exception
+     */
+    @Test
+    public void testNoPartFilter() throws Exception {
+        LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by age == 20 and name == 'foo';");
+        test(lp, Arrays.asList("srcid"), null, 
+                "((age == 20) and (name == 'foo'))");
+    }
+    
+    /**
+     * test case where filter only contains condition on partition cols
+     * @throws Exception
+     */
+    @Test
+    public void testOnlyPartFilter1() throws Exception {
+        LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by srcid > 20 and mrkt == 'us';");
+        test(lp, Arrays.asList("srcid", "mrkt"), 
+                    "((srcid > 20) and (mrkt == 'us'))", null);
+        
+    }
+    
+    /**
+     * test case where filter only contains condition on partition cols
+     * @throws Exception
+     */
+    @Test
+    public void testOnlyPartFilter2() throws Exception {
+        LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by mrkt == 'us';");
+        test(lp, Arrays.asList("srcid", "mrkt"), 
+                    "(mrkt == 'us')", null);
+        
+    }
+    
+    /**
+     * test case where filter only contains condition on partition cols
+     * @throws Exception
+     */
+    @Test
+    public void testOnlyPartFilter3() throws Exception {
+        LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by srcid == 20 or mrkt == 'us';");
+        test(lp, Arrays.asList("srcid", "mrkt"), 
+                    "((srcid == 20) or (mrkt == 'us'))", null);
+        
+    }
+    
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns
+     */
+    @Test
+    public void testMixed1() throws Exception {
+        LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by " +
+            		"(age < 20 and  mrkt == 'us') and (srcid == 10 and " +
+            		"name == 'foo');");
+        test(lp, Arrays.asList("srcid", "mrkt"), 
+                "((mrkt == 'us') and (srcid == 10))", 
+                "((age < 20) and (name == 'foo'))");
+    }
+    
+    
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns
+     */
+    @Test
+    public void testMixed2() throws Exception {
+        LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by " +
+                    "(age >= 20 and  mrkt == 'us') and (srcid == 10 and " +
+                    "dstid == 15);");
+        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
+                "((mrkt == 'us') and ((srcid == 10) and (dstid == 15)))", 
+                "(age >= 20)");
+    }
+    
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns
+     */
+    @Test
+    public void testMixed3() throws Exception {
+        LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by " +
+                    "age >= 20 and  mrkt == 'us' and srcid == 10;");
+        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
+                "((mrkt == 'us') and (srcid == 10))", "(age >= 20)");
+    }
+    
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns - this testcase also has a condition
+     * based on comparison of two partition columns
+     */
+    @Test
+    public void testMixed4() throws Exception {
+        LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by " +
+                    "age >= 20 and  mrkt == 'us' and name == 'foo' and " +
+                    "srcid == dstid;");
+        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
+                "((mrkt == 'us') and (srcid == dstid))", 
+                "((age >= 20) and (name == 'foo'))");
+    }
+    
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns - 
+     * This testcase has two partition col conditions  with OR +  non parition 
+     * col conditions
+     */
+    @Test
+    public void testMixed5() throws Exception {
+        LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by " +
+                    "(srcid == 10 or mrkt == 'us') and name == 'foo' and " +
+                    "dstid == 30;");
+        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
+                "(((srcid == 10) or (mrkt == 'us')) and (dstid == 30))", 
+                "(name == 'foo')");
+    }
+    
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns - 
+     * This testcase has two partition col conditions  with OR +  non parition 
+     * col conditions
+     */
+    @Test
+    public void testMixed6() throws Exception {
+        LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by " +
+                    "dstid == 30 and (srcid == 10 or mrkt == 'us') and name == 'foo';");
+        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
+                "((dstid == 30) and ((srcid == 10) or (mrkt == 'us')))", 
+                "(name == 'foo')");
+    }
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns. This testcase also tests arithmetic
+     * in partition column conditions
+     */
+    @Test
+    public void testMixedArith() throws Exception {
+        LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by " +
+                    "mrkt == 'us' and srcid * 10 == 150 + 20 and age != 15;");
+        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
+                "((mrkt == 'us') and ((srcid * 10) == (150 + 20)))", 
+                "(age != 15)");
+    }
+    
+    @Test
+    public void testNegPColConditionWithNonPCol() throws Exception {
+        // use of partition column condition and non partition column in 
+        // same condition should fail
+        LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
+                    "srcid > age;");
+        negativeTest(lp, Arrays.asList("srcid"), 1111);
+        lp =  lpTester.buildPlan("b = filter a by " +
+                    "srcid + age == 20;");
+        negativeTest(lp, Arrays.asList("srcid"), 1111);
+
+        // OR of partition column condition and non partiton col condition 
+        // should fail
+        lp = lpTester.buildPlan("b = filter a by " +
+                    "srcid > 10 or name == 'foo';");
+        negativeTest(lp, Arrays.asList("srcid"), 1111);
+    }
+    
+    @Test
+    public void testNegPColInWrongPlaces() throws Exception {
+        
+        int expectedErrCode = 1112;
+        LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
+        "(srcid > 10 and name == 'foo') or dstid == 10;");
+        negativeTest(lp, Arrays.asList("srcid", "dstid"), expectedErrCode); 
+        
+        expectedErrCode = 1110;
+        lp = lpTester.buildPlan("b = filter a by " +
+                "CONCAT(mrkt, '_10') == 'US_10' and age == 20;");
+        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
+        
+        lp = lpTester.buildPlan("b = filter a by " +
+                "mrkt matches '.*us.*' and age < 15;");
+        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
+        
+        lp = lpTester.buildPlan("b = filter a by " +
+                "(int)mrkt == 10 and name matches '.*foo.*';");
+        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"),expectedErrCode);
+        
+        lp = lpTester.buildPlan("b = filter a by " +
+            "(mrkt == 'us' ? age : age + 10) == 40 and name matches '.*foo.*';");
+        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
+        
+        lp = lpTester.buildPlan("b = filter a by " +
+            "(mrkt is null) and name matches '.*foo.*';");
+        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
+        
+        lp = lpTester.buildPlan("b = filter a by " +
+            "(mrkt is not null) and name matches '.*foo.*';");
+        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
+    }
+    
+    
+    /**
+     * Test that pig sends correct partition column names in setPartitionFilter
+     * when the user has a schema in the load statement which renames partition
+     * columns
+     * @throws Exception
+     */
+    @Test
+    public void testColNameMapping1() throws Exception {
+        TestLoader.partFilter = null;
+        lpTester.buildPlan("a = load 'foo' using "
+            + TestLoader.class.getName() + 
+            "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
+            "'srcid,mrkt') as (f1, f2, f3, f4, f5);");
+        LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
+        		"(f5 >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);");
+        lpTester.typeCheckPlan(lp);
+        lpTester.optimizePlan(lp);
+        assertEquals("checking partition filter:",             
+                    "((mrkt == 'us') and (srcid == 10))",
+                    TestLoader.partFilter.toString());
+        LOFilter filter = (LOFilter) lp.getLeaves().get(0);
+        String actual = PColFilterExtractor.getExpression(
+                (ExpressionOperator) filter.getComparisonPlan().
+                getLeaves().get(0)).
+                toString().toLowerCase();
+        assertEquals("checking trimmed filter expression:", 
+                "((f5 >= 20) and (f3 == 15))", actual);
+    }
+    
+    
+    /**
+     * Test that pig sends correct partition column names in setPartitionFilter
+     * when the user has a schema in the load statement which renames partition
+     * columns - in this test case there is no condition on partition columns
+     * - so setPartitionFilter() should not be called and the filter condition
+     * should remain as is.
+     * @throws Exception
+     */
+    @Test
+    public void testColNameMapping2() throws Exception {
+        TestLoader.partFilter = null;
+        lpTester.buildPlan("a = load 'foo' using "
+            + TestLoader.class.getName() + 
+            "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
+            "'srcid') as (f1, f2, f3, f4, f5);");
+        LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
+                "f5 >= 20 and f2 == 'us' and f3 == 15;");
+        lpTester.typeCheckPlan(lp);
+        lpTester.optimizePlan(lp);
+        assertEquals("checking partition filter:",             
+                    null,
+                    TestLoader.partFilter);
+        LOFilter filter = (LOFilter) lp.getLeaves().get(0);
+        String actual = PColFilterExtractor.getExpression(
+                (ExpressionOperator) filter.getComparisonPlan().
+                getLeaves().get(0)).
+                toString().toLowerCase();
+        assertEquals("checking trimmed filter expression:", 
+                "(((f5 >= 20) and (f2 == 'us')) and (f3 == 15))", actual);
+    }
+    
+    /**
+     * Test that pig sends correct partition column names in setPartitionFilter
+     * when the user has a schema in the load statement which renames partition
+     * columns - in this test case the filter only has conditions on partition
+     * columns
+     * @throws Exception
+     */
+    @Test
+    public void testColNameMapping3() throws Exception {
+        TestLoader.partFilter = null;
+        lpTester.buildPlan("a = load 'foo' using "
+            + TestLoader.class.getName() + 
+            "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
+            "'srcid,mrkt,dstid,age') as (f1, f2, f3, f4, f5);");
+        LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
+                "(f5 >= 20 or f2 == 'us') and (f1 == 10 and f3 == 15);");
+        lpTester.typeCheckPlan(lp);
+        lpTester.optimizePlan(lp);
+        assertEquals("checking partition filter:",             
+                    "(((age >= 20) or (mrkt == 'us')) and ((srcid == 10) and " +
+                    "(dstid == 15)))",
+                    TestLoader.partFilter.toString());
+        Iterator<LogicalOperator> it = lp.iterator();
+        assertTrue("Checking that filter has been removed since it contained" +
+        		" only conditions on partition cols:", 
+        		(it.next() instanceof LOLoad));
+        assertFalse("Checking that filter has been removed since it contained" +
+                " only conditions on partition cols:", 
+                it.hasNext());
+        
+    }
+    
+    /**
+     * Test that pig sends correct partition column names in setPartitionFilter
+     * when the user has a schema in the load statement which renames partition
+     * columns - in this test case the schema in load statement is a prefix 
+     * (with columns renamed) of the schema returned by 
+     * {@link LoadMetadata#getSchema(String, Configuration)}
+     * @throws Exception
+     */
+    @Test
+    public void testColNameMapping4() throws Exception {
+        TestLoader.partFilter = null;
+        lpTester.buildPlan("a = load 'foo' using "
+            + TestLoader.class.getName() + 
+            "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
+            "'srcid,mrkt') as (f1, f2, f3);");
+        LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
+                "(age >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);");
+        lpTester.typeCheckPlan(lp);
+        lpTester.optimizePlan(lp);
+        assertEquals("checking partition filter:",             
+                    "((mrkt == 'us') and (srcid == 10))",
+                    TestLoader.partFilter.toString());
+        LOFilter filter = (LOFilter) lp.getLeaves().get(0);
+        String actual = PColFilterExtractor.getExpression(
+                (ExpressionOperator) filter.getComparisonPlan().
+                getLeaves().get(0)).
+                toString().toLowerCase();
+        assertEquals("checking trimmed filter expression:", 
+                "((age >= 20) and (f3 == 15))", actual);
+    }
+    
+    //// helper methods ///////
+    
+    private PColFilterExtractor test(LogicalPlan lp, List<String> partitionCols, 
+            String expPartFilterString, String expFilterString) 
+    throws FrontendException {
+        LOFilter filter = (LOFilter)lp.getLeaves().get(0);
+        PColFilterExtractor pColExtractor = new PColFilterExtractor(
+                filter.getComparisonPlan(), partitionCols);
+        pColExtractor.visit();
+        
+        if(expPartFilterString == null) {
+            assertEquals("Checking partition column filter:", null, 
+                    pColExtractor.getPColCondition());
+        } else  {
+            assertEquals("Checking partition column filter:", 
+                    expPartFilterString.toLowerCase(), 
+                    pColExtractor.getPColCondition().toString().toLowerCase());   
+        }
+        
+        if(expFilterString == null) {
+            assertTrue("Check that filter can be removed:", 
+                    pColExtractor.isFilterRemovable());
+        } else {
+            String actual = PColFilterExtractor.getExpression(
+                                (ExpressionOperator) filter.getComparisonPlan().
+                                getLeaves().get(0)).
+                                toString().toLowerCase();
+            assertEquals("checking trimmed filter expression:", expFilterString,
+                    actual);
+        }
+        return pColExtractor;
+    }
+    
+    private void negativeTest(LogicalPlan lp, List<String> partitionCols,
+            int expectedErrorCode) {
+        LOFilter filter = (LOFilter)lp.getLeaves().get(0);
+        PColFilterExtractor pColExtractor = new PColFilterExtractor(
+                filter.getComparisonPlan(), partitionCols);
+        try {
+            pColExtractor.visit();
+        } catch(Exception e) {
+            assertEquals("Checking if exception has right error code", 
+                    expectedErrorCode, LogUtils.getPigException(e).getErrorCode());
+            return;
+        }
+        fail("Exception expected!");
+    }
+    
+    /**
+     * this loader is only used to test that parition column filters are given
+     * in the manner expected in terms of column names - hence it does not
+     * implement many of the methods and only implements required ones.
+     */
+    public static class TestLoader extends LoadFunc implements LoadMetadata {
+
+        Schema schema;
+        String[] partCols;
+        static Expression partFilter = null;
+        
+        public TestLoader(String schemaString, String commaSepPartitionCols) 
+        throws ParseException {
+            schema = Util.getSchemaFromString(schemaString);
+            partCols = commaSepPartitionCols.split(",");
+        }
+        
+        @Override
+        public InputFormat getInputFormat() throws IOException {
+            return null;
+        }
+
+        @Override
+        public Tuple getNext() throws IOException {
+            return null;
+        }
+
+        @Override
+        public void prepareToRead(RecordReader reader, PigSplit split)
+                throws IOException {
+        }
+
+        @Override
+        public void setLocation(String location, Job job) throws IOException {
+        }
+
+        @Override
+        public String[] getPartitionKeys(String location, Configuration conf)
+                throws IOException {
+            return partCols;
+        }
+
+        @Override
+        public ResourceSchema getSchema(String location, Configuration conf)
+                throws IOException {
+            return new ResourceSchema(schema);
+        }
+
+        @Override
+        public ResourceStatistics getStatistics(String location,
+                Configuration conf) throws IOException {
+            return null;
+        }
+
+        @Override
+        public void setPartitionFilter(Expression partitionFilter)
+                throws IOException {
+            partFilter = partitionFilter;            
+        }
+        
+    }
+}