You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by an...@apache.org on 2013/11/28 02:29:43 UTC

svn commit: r1546253 - in /pig/trunk: ./ conf/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/newplan/ src/org/apache/pig/newplan/logical/optimizer/ src/org/apache/pig/newplan/logical/rules/ test/org/apache/pi...

Author: aniket486
Date: Thu Nov 28 01:29:43 2013
New Revision: 1546253

URL: http://svn.apache.org/r1546253
Log:
PIG-3590: remove PartitionFilterOptimizer from trunk (aniket486)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/conf/pig.properties
    pig/trunk/src/org/apache/pig/PigConstants.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
    pig/trunk/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java
    pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1546253&r1=1546252&r2=1546253&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Nov 28 01:29:43 2013
@@ -28,6 +28,8 @@ PIG-3419: Pluggable Execution Engine (ac
 
 IMPROVEMENTS
 
+PIG-3590: remove PartitionFilterOptimizer from trunk (aniket486)
+
 PIG-3580: MIN, MAX and AVG functions for BigDecimal and BigInteger (harichinnan via cheolsoo)
 
 PIG-3569: SUM function for BigDecimal and BigInteger (harichinnan via rohini)

Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1546253&r1=1546252&r2=1546253&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Thu Nov 28 01:29:43 2013
@@ -225,10 +225,6 @@ pig.location.check.strict=false
 # application master getting restarted.
 # pig.output.committer.recovery.support=true
 
-# Set this option to true if you need to use the old partition filter optimizer. 
-# Note: Old filter optimizer PColFilterOptimizer will be deprecated in the future.
-# pig.exec.useOldPartitionFilterOptimizer=true
-
 # By default, the size of pig script stored in job xml is limited to 10,240
 # characters. This property can be used to configure it.
 # pig.script.max.size=<somevalue>

Modified: pig/trunk/src/org/apache/pig/PigConstants.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConstants.java?rev=1546253&r1=1546252&r2=1546253&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConstants.java (original)
+++ pig/trunk/src/org/apache/pig/PigConstants.java Thu Nov 28 01:29:43 2013
@@ -47,10 +47,4 @@ public class PigConstants {
      */
     public static final String PIG_OPTIMIZER_RULES_DISABLED_KEY = "pig.optimizer.rules.disabled";
 
-    /**
-     * flag to use old PartitionFilterOptimizer in case NewPartitionFilterOptimizer is not backwards compatible
-     * (A known case is "filter a by 1 == 0").
-     */
-    public static final String PIG_EXEC_OLD_PART_FILTER_OPTIMIZER = "pig.exec.useOldPartitionFilterOptimizer";
-
 }
\ No newline at end of file

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1546253&r1=1546252&r2=1546253&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Nov 28 01:29:43 2013
@@ -257,14 +257,6 @@ public abstract class HExecutionEngine i
             disabledOptimizerRules = new HashSet<String>();
         }
 
-        if( ! Boolean.valueOf(this.pigContext.getProperties().getProperty(
-                PigConstants.PIG_EXEC_OLD_PART_FILTER_OPTIMIZER, "false"))){
-            // Turn off the old partition filter optimizer
-            disabledOptimizerRules.add("PartitionFilterOptimizer");
-        } else {
-            disabledOptimizerRules.add("NewPartitionFilterOptimizer");
-        }
-
         String pigOptimizerRulesDisabled = this.pigContext.getProperties()
                 .getProperty(PigConstants.PIG_OPTIMIZER_RULES_DISABLED_KEY);
         if (pigOptimizerRulesDisabled != null) {
@@ -275,7 +267,6 @@ public abstract class HExecutionEngine i
         if (pigContext.inIllustrator) {
             disabledOptimizerRules.add("MergeForEach");
             disabledOptimizerRules.add("PartitionFilterOptimizer");
-            disabledOptimizerRules.add("NewPartitionFilterOptimizer");
             disabledOptimizerRules.add("LimitOptimizer");
             disabledOptimizerRules.add("SplitFilter");
             disabledOptimizerRules.add("PushUpFilter");

Modified: pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java?rev=1546253&r1=1546252&r2=1546253&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java Thu Nov 28 01:29:43 2013
@@ -1,600 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.newplan;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.Expression;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.util.Pair;
-
-import org.apache.pig.Expression.OpType;
-import org.apache.pig.newplan.logical.expression.AddExpression;
-import org.apache.pig.newplan.logical.expression.AndExpression;
-import org.apache.pig.newplan.logical.expression.BinCondExpression;
-import org.apache.pig.newplan.logical.expression.BinaryExpression;
-import org.apache.pig.newplan.logical.expression.CastExpression;
-import org.apache.pig.newplan.logical.expression.ConstantExpression;
-import org.apache.pig.newplan.logical.expression.DereferenceExpression;
-import org.apache.pig.newplan.logical.expression.DivideExpression;
-import org.apache.pig.newplan.logical.expression.EqualExpression;
-import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression;
-import org.apache.pig.newplan.logical.expression.GreaterThanExpression;
-import org.apache.pig.newplan.logical.expression.IsNullExpression;
-import org.apache.pig.newplan.logical.expression.LessThanEqualExpression;
-import org.apache.pig.newplan.logical.expression.LessThanExpression;
-import org.apache.pig.newplan.logical.expression.LogicalExpression;
-import org.apache.pig.newplan.logical.expression.MapLookupExpression;
-import org.apache.pig.newplan.logical.expression.ModExpression;
-import org.apache.pig.newplan.logical.expression.MultiplyExpression;
-import org.apache.pig.newplan.logical.expression.NotEqualExpression;
-import org.apache.pig.newplan.logical.expression.NotExpression;
-import org.apache.pig.newplan.logical.expression.OrExpression;
-import org.apache.pig.newplan.logical.expression.ProjectExpression;
-import org.apache.pig.newplan.logical.expression.RegexExpression;
-import org.apache.pig.newplan.logical.expression.SubtractExpression;
-import org.apache.pig.newplan.logical.expression.UserFuncExpression;
-import org.apache.pig.newplan.DepthFirstWalker;
-
-/**
- * This Visitor works on the filter condition of a LOFilter which immediately
- * follows a LOLoad that interacts with a metadata system (currently OWL) to
- * read table data. The visitor looks for conditions on partition columns in the
- * filter condition and extracts those conditions out of the filter condition.
- * The condition on partition cols will be used to prune partitions of the table.
- *
- */
-@Deprecated
-public class PColFilterExtractor extends PlanVisitor {
-
-    private static final Log LOG = LogFactory.getLog(PColFilterExtractor.class);
-
-    /**
-     * partition columns associated with the table
-     * present in the load on which the filter whose
-     * inner plan is being visited is applied
-     */
-    private List<String> partitionCols;
-
-    /**
-     * will contain the partition column filter conditions
-     * accumulated during the visit - the final condition will an expression
-     * built from these sub expressions connected with AND
-     */
-    private ArrayList<Expression> pColConditions = new ArrayList<Expression>();
-
-    /**
-     * flag used during visit to indicate if a partition key
-     * was seen
-     */
-    private boolean sawKey;
-
-    private boolean sawNonKeyCol;
-
-    private enum Side { LEFT, RIGHT, NONE };
-    private Side replaceSide = Side.NONE;
-
-    private boolean filterRemovable = false;
-
-    private boolean canPushDown = true;
-
-    @Override
-    public void visit() throws FrontendException {
-        // we will visit the leaf and it will recursively walk the plan
-        LogicalExpression leaf = (LogicalExpression)plan.getSources().get( 0 );
-        // if the leaf is a unary operator it should be a FilterFunc in
-        // which case we don't try to extract partition filter conditions
-        if(leaf instanceof BinaryExpression) {
-            BinaryExpression binExpr = (BinaryExpression)leaf;
-            visit( binExpr );
-            replaceChild( binExpr );
-            // if the entire expression is to be removed, then the above
-            // replaceChild will not set sawKey to false (sawKey is set to
-            // false only in replaceChild()
-            if(sawKey == true) {
-                //there are only conditions on partition columns in the filter
-                //extract it
-                pColConditions.add( getExpression( leaf ) );
-                filterRemovable = true;
-            }
-        }
-    }
-
-    /**
-     *
-     * @param plan logical plan corresponding the filter's comparison condition
-     * @param partitionCols list of partition columns of the table which is
-     * being loaded in the LOAD statement which is input to the filter
-     */
-    public PColFilterExtractor(OperatorPlan plan,
-            List<String> partitionCols) {
-        // though we configure a DepthFirstWalker to be the walker, we will not
-        // use it - we will visit the leaf and it will recursively walk the
-        // plan
-        super( plan, new DepthFirstWalker( plan ) );
-        this.partitionCols = new ArrayList<String>(partitionCols);
-    }
-
-    protected void visit(ProjectExpression project) throws FrontendException {
-        String fieldName = project.getFieldSchema().alias;
-        if(partitionCols.contains(fieldName)) {
-            sawKey = true;
-            // The condition on partition column will be used to prune the
-            // scan and removed from the filter condition. Hence the condition
-            // on the partition column will not be re applied when data is read,
-            // so the following cases should throw error until that changes.
-            List<Class<?>> opsToCheckFor = new ArrayList<Class<?>>();
-                        opsToCheckFor.add(UserFuncExpression.class);
-            if(checkSuccessors(project, opsToCheckFor)) {
-                LOG.warn("No partition filter push down: " +
-                    "You have an partition column ("
-                    + fieldName + ") inside a function in the " +
-                    "filter condition.");
-                canPushDown = false;
-                return;
-            }
-            opsToCheckFor.set(0, CastExpression.class);
-            if(checkSuccessors(project, opsToCheckFor)) {
-                LOG.warn("No partition filter push down: " +
-                    "You have an partition column ("
-                    + fieldName + ") inside a cast in the " +
-                    "filter condition.");
-                canPushDown = false;
-                return;
-            }
-            opsToCheckFor.set(0, IsNullExpression.class);
-            if(checkSuccessors(project, opsToCheckFor)) {
-                LOG.warn("No partition filter push down: " +
-                    "You have an partition column ("
-                    + fieldName + ") inside a null check operator in the " +
-                    "filter condition.");
-                canPushDown = false;
-                return;
-            }
-            opsToCheckFor.set(0, BinCondExpression.class);
-            if(checkSuccessors(project, opsToCheckFor)) {
-                LOG.warn("No partition filter push down: " +
-                    "You have an partition column ("
-                    + fieldName + ") inside a bincond operator in the " +
-                    "filter condition.");
-                canPushDown = false;
-                return;
-            }
-        } else {
-            sawNonKeyCol = true;
-        }
-    }
-
-    /**
-     * Detect whether a non-partition column is present in the expression.
-     * @param binOp
-     * @return true or false
-     * @throws FrontendException
-     */
-    private boolean detectNonPartitionColumn(BinaryExpression binOp) throws FrontendException {
-        LogicalExpression lhs = binOp.getLhs();
-        LogicalExpression rhs = binOp.getRhs();
-        if (lhs instanceof ProjectExpression) {
-            String fieldName = ((ProjectExpression)lhs).getFieldSchema().alias;
-            if(!partitionCols.contains(fieldName)) {
-                return true;
-            }
-        }
-        if (rhs instanceof ProjectExpression) {
-            String fieldName = ((ProjectExpression)rhs).getFieldSchema().alias;
-            if(!partitionCols.contains(fieldName)) {
-                return true;
-            }
-        }
-
-        boolean lhsSawNonKeyCol = false;
-        boolean rhsSawNonKeyCol = false;
-        if (lhs instanceof BinaryExpression) {
-            lhsSawNonKeyCol = detectNonPartitionColumn((BinaryExpression)lhs);
-        }
-        if (rhs instanceof BinaryExpression) {
-            rhsSawNonKeyCol = detectNonPartitionColumn((BinaryExpression)rhs);
-        }
-
-        return lhsSawNonKeyCol || rhsSawNonKeyCol;
-    }
-
-    /**
-     * Detect and/or expressions that contain both partition and non-partition
-     * conditions such as '(pcond and non-pcond) or (pcond and non-pcond)'.
-     * @param binOp
-     * @return true or false
-     * @throws FrontendException
-     */
-    private boolean detectAndOrConditionWithMixedColumns(BinaryExpression binOp) throws FrontendException {
-        LogicalExpression lhs = binOp.getLhs();
-        LogicalExpression rhs = binOp.getRhs();
-
-        if ( (binOp instanceof OrExpression) &&
-             ( (lhs instanceof AndExpression && rhs instanceof AndExpression) ||
-               (lhs instanceof OrExpression || rhs instanceof OrExpression) ) ) {
-            return detectNonPartitionColumn(binOp);
-        }
-
-        return false;
-    }
-
-    private void visit(BinaryExpression binOp) throws FrontendException {
-        boolean lhsSawKey = false;
-        boolean rhsSawKey = false;
-        boolean lhsSawNonKeyCol = false;
-        boolean rhsSawNonKeyCol = false;
-        sawKey = false;
-        sawNonKeyCol = false;
-
-        if (detectAndOrConditionWithMixedColumns(binOp)) {
-            sawNonKeyCol = true;
-            // Don't set canPushDown to false. If there are other AND
-            // conditions on a partition column we want to push that down
-            LOG.warn("No partition filter push down: You have partition and non-partition "
-                    + "columns  in a construction like: "
-                    + "(pcond and non-pcond ..) or (pcond and non-pcond ...) "
-                    + "where pcond is a condition on a partition column and "
-                    + "non-pcond is a condition on a non-partition column.");
-            return;
-        }
-
-        visit( binOp.getLhs() );
-        replaceChild(binOp.getLhs());
-        lhsSawKey = sawKey;
-        lhsSawNonKeyCol = sawNonKeyCol;
-
-        sawKey = false;
-        sawNonKeyCol = false;
-        visit( binOp.getRhs() );
-        replaceChild(binOp.getRhs());
-        rhsSawKey = sawKey;
-        rhsSawNonKeyCol = sawNonKeyCol;
-
-        // only in the case of an AND, we potentially split the AND to
-        // remove conditions on partition columns out of the AND. For this
-        // we set replaceSide accordingly so that when we reach a predecessor
-        // we can trim the appropriate side. If both sides of the AND have
-        // conditions on partition columns, we will remove the AND completely -
-        // in this case, we will not set replaceSide, but sawKey will be
-        // true so that as we go to higher predecessor ANDs we can trim later.
-        if(binOp instanceof AndExpression) {
-            if(lhsSawKey && rhsSawNonKeyCol){
-                replaceSide = Side.LEFT;
-            }else if(rhsSawKey && lhsSawNonKeyCol){
-                replaceSide = Side.RIGHT;
-            }
-        } else if(lhsSawKey && rhsSawNonKeyCol || rhsSawKey && lhsSawNonKeyCol){
-            LOG.warn("No partition filter push down: " +
-                "Use of partition column/condition with" +
-                " non partition column/condition in filter expression is not " +
-                "supported.");
-            canPushDown = false;
-        }
-
-        sawKey = lhsSawKey || rhsSawKey;
-        sawNonKeyCol = lhsSawNonKeyCol || rhsSawNonKeyCol;
-    }
-
-    /**
-     * @return the condition on partition columns extracted from filter
-     */
-    public  Expression getPColCondition(){
-        if(!canPushDown || pColConditions.size() == 0)
-            return null;
-        Expression cond =  pColConditions.get(0);
-        for(int i=1; i<pColConditions.size(); i++){
-            //if there is more than one condition expression
-            // connect them using "AND"s
-            cond = new Expression.BinaryExpression(cond, pColConditions.get(i),
-                    OpType.OP_AND);
-        }
-        return cond;
-    }
-
-    /**
-     * @return the filterRemovable
-     */
-    public boolean isFilterRemovable() {
-        return canPushDown && filterRemovable;
-    }
-
-    //////// helper methods /////////////////////////
-    /**
-     * check for the presence of a certain operator type in the Successors
-     * @param opToStartFrom
-     * @param opsToCheckFor operators to be checked for at each level of
-     * Successors - the ordering in the list is the order in which the ops
-     * will be checked.
-     * @return true if opsToCheckFor are found
-     * @throws IOException
-     */
-    private boolean checkSuccessors(Operator opToStartFrom,
-            List<Class<?>> opsToCheckFor) throws FrontendException {
-        boolean done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
-        if(!done && !opsToCheckFor.isEmpty()) {
-            // continue checking if there is more to check
-            while(!done) {
-                opToStartFrom = plan.getPredecessors(opToStartFrom).get(0);
-                done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
-            }
-        }
-        return opsToCheckFor.isEmpty();
-    }
-
-    private boolean checkSuccessorsHelper(Operator opToStartFrom,
-            List<Class<?>> opsToCheckFor) throws FrontendException {
-        List<Operator> successors = plan.getPredecessors(
-                opToStartFrom);
-        if(successors == null || successors.size() == 0) {
-            return true; // further checking cannot be done
-        }
-        if(successors.size() == 1) {
-            Operator suc  = successors.get(0);
-            if(suc.getClass().getCanonicalName().equals(
-                    opsToCheckFor.get(0).getCanonicalName())) {
-                // trim the list of operators to check
-                opsToCheckFor.remove(0);
-                if(opsToCheckFor.isEmpty()) {
-                    return true; //no further checks required
-                }
-            }
-        } else {
-            logInternalErrorAndSetFlag();
-        }
-        return false; // more checking can be done
-    }
-
-    private void replaceChild(LogicalExpression childExpr) throws FrontendException {
-
-        if(replaceSide == Side.NONE) {
-            // the child is trimmed when the appropriate
-            // flag is set to indicate that it needs to be trimmed.
-            return;
-        }
-
-        // eg if replaceSide == Side.LEFT
-        //    binexpop
-        //   /   \ \
-        // child (this is the childExpr argument send in)
-        //  /  \
-        // Lt   Rt
-        //
-        // gets converted to
-        //  binexpop
-        //  /
-        // Rt
-
-        if( !( childExpr instanceof BinaryExpression ) ) {
-             logInternalErrorAndSetFlag();
-             return;
-        }
-        // child's lhs operand
-        LogicalExpression leftChild =
-            ((BinaryExpression)childExpr).getLhs();
-        // child's rhs operand
-        LogicalExpression rightChild =
-            ((BinaryExpression)childExpr).getRhs();
-
-        plan.disconnect( childExpr, leftChild );
-        plan.disconnect( childExpr, rightChild );
-
-        if(replaceSide == Side.LEFT) {
-            // remove left child and replace childExpr with its right child
-            remove( leftChild );
-            replace(childExpr, rightChild);
-        } else if(replaceSide == Side.RIGHT){
-            // remove right child and replace childExpr with its left child
-            remove(rightChild);
-            replace(childExpr, leftChild);
-        } else {
-            logInternalErrorAndSetFlag();
-            return;
-        }
-        //reset
-        replaceSide = Side.NONE;
-        sawKey = false;
-
-    }
-
-    private void replace(Operator oldOp, Operator newOp) throws FrontendException {
-        List<Operator> grandParents = plan.getPredecessors( oldOp );
-        if( grandParents == null || grandParents.size() == 0 ) {
-            plan.remove( oldOp );
-            return;
-        }
-        Operator grandParent = plan.getPredecessors( oldOp ).get( 0 );
-        Pair<Integer, Integer> pair = plan.disconnect( grandParent, oldOp );
-        plan.add( newOp );
-        plan.connect( grandParent, pair.first, newOp, pair.second );
-        plan.remove( oldOp );
-    }
-
-    /**
-     * @param op
-     * @throws IOException
-     * @throws IOException
-     * @throws IOException
-     */
-    private void remove(LogicalExpression op) throws FrontendException {
-        pColConditions.add( getExpression( op ) );
-        removeTree( op );
-    }
-
-    /**
-     * Assume that the given operator is already disconnected from its predecessors.
-     * @param op
-     * @throws FrontendException
-     */
-    private void removeTree(Operator op) throws FrontendException {
-        List<Operator> succs = plan.getSuccessors( op );
-        if( succs == null ) {
-            plan.remove( op );
-            return;
-        }
-
-        Operator[] children = new Operator[succs.size()];
-        for( int i = 0; i < succs.size(); i++ ) {
-            children[i] = succs.get(i);
-        }
-
-        for( Operator succ : children ) {
-            plan.disconnect( op, succ );
-            removeTree( succ );
-        }
-
-        plan.remove( op );
-    }
-
-    public Expression getExpression(LogicalExpression op) throws FrontendException
-     {
-        if(op instanceof ConstantExpression) {
-            ConstantExpression constExpr =(ConstantExpression)op ;
-            return new Expression.Const( constExpr.getValue() );
-        } else if (op instanceof ProjectExpression) {
-            ProjectExpression projExpr = (ProjectExpression)op;
-            String fieldName = projExpr.getFieldSchema().alias;
-            return new Expression.Column(fieldName);
-        } else {
-            if( !( op instanceof BinaryExpression ) ) {
-                logInternalErrorAndSetFlag();
-                return null;
-            }
-            BinaryExpression binOp = (BinaryExpression)op;
-            if(binOp instanceof AddExpression) {
-                return getExpression( binOp, OpType.OP_PLUS );
-            } else if(binOp instanceof SubtractExpression) {
-                return getExpression(binOp, OpType.OP_MINUS);
-            } else if(binOp instanceof MultiplyExpression) {
-                return getExpression(binOp, OpType.OP_TIMES);
-            } else if(binOp instanceof DivideExpression) {
-                return getExpression(binOp, OpType.OP_DIV);
-            } else if(binOp instanceof ModExpression) {
-                return getExpression(binOp, OpType.OP_MOD);
-            } else if(binOp instanceof AndExpression) {
-                return getExpression(binOp, OpType.OP_AND);
-            } else if(binOp instanceof OrExpression) {
-                return getExpression(binOp, OpType.OP_OR);
-            } else if(binOp instanceof EqualExpression) {
-                return getExpression(binOp, OpType.OP_EQ);
-            } else if(binOp instanceof NotEqualExpression) {
-                return getExpression(binOp, OpType.OP_NE);
-            } else if(binOp instanceof GreaterThanExpression) {
-                return getExpression(binOp, OpType.OP_GT);
-            } else if(binOp instanceof GreaterThanEqualExpression) {
-                return getExpression(binOp, OpType.OP_GE);
-            } else if(binOp instanceof LessThanExpression) {
-                return getExpression(binOp, OpType.OP_LT);
-            } else if(binOp instanceof LessThanEqualExpression) {
-                return getExpression(binOp, OpType.OP_LE);
-            } else if(binOp instanceof RegexExpression) {
-                return getExpression(binOp, OpType.OP_MATCH);
-            } else {
-                logInternalErrorAndSetFlag();
-            }
-        }
-        return null;
-    }
-
-    private Expression getExpression(BinaryExpression binOp, OpType
-            opType) throws FrontendException {
-        return new Expression.BinaryExpression(getExpression(binOp.getLhs())
-                , getExpression(binOp.getRhs()), opType);
-    }
-
-    private void logInternalErrorAndSetFlag() throws FrontendException {
-        LOG.warn("No partition filter push down: "
-                + "Internal error while processing any partition filter "
-                + "conditions in the filter after the load");
-        canPushDown = false;
-    }
-
-    // this might get called from some visit() - in that case, delegate to
-    // the other visit()s which we have defined here
-    private void visit(LogicalExpression op) throws FrontendException {
-        if(op instanceof ProjectExpression) {
-            visit((ProjectExpression)op);
-        } else if (op instanceof BinaryExpression) {
-            visit((BinaryExpression)op);
-        } else if (op instanceof CastExpression) {
-            visit((CastExpression)op);
-        } else if (op instanceof BinCondExpression) {
-            visit((BinCondExpression)op);
-        } else if (op instanceof UserFuncExpression) {
-            visit((UserFuncExpression)op);
-        } else if (op instanceof IsNullExpression) {
-            visit((IsNullExpression)op);
-        } else if( op instanceof NotExpression ) {
-            visit( (NotExpression)op );
-        } else if( op instanceof RegexExpression ) {
-            visit( (RegexExpression)op );
-        } else if (op instanceof MapLookupExpression) {
-            visit((MapLookupExpression) op);
-        } else if (op instanceof DereferenceExpression) {
-            visit((DereferenceExpression) op);
-        }
-    }
-
-    // some specific operators which are of interest to catch some
-    // unsupported scenarios
-    private void visit(CastExpression cast) throws FrontendException {
-        visit(cast.getExpression());
-    }
-
-    private void visit(NotExpression not) throws FrontendException {
-        visit(not.getExpression());
-    }
-
-    private void visit(RegexExpression regexp) throws FrontendException {
-        visit((BinaryExpression)regexp);
-    }
-
-    private void visit(BinCondExpression binCond) throws FrontendException {
-        visit(binCond.getCondition());
-        visit(binCond.getLhs());
-        visit(binCond.getRhs());
-    }
-
-    private void visit(UserFuncExpression udf) throws FrontendException {
-        for (LogicalExpression op : udf.getArguments()) {
-            visit(op);
-        }
-    }
-
-    private void visit(IsNullExpression isNull) throws FrontendException {
-        visit(isNull.getExpression());
-    }
-
-    private void visit(MapLookupExpression mapLookup) throws FrontendException {
-        visit(mapLookup.getMap());
-    }
-
-    private void visit(DereferenceExpression deref) throws FrontendException {
-        visit(deref.getReferredExpression());
-    }
-
-    public boolean canPushDown() {
-        return canPushDown;
-    }
-
-}

Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=1546253&r1=1546252&r2=1546253&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Thu Nov 28 01:29:43 2013
@@ -127,15 +127,6 @@ public class LogicalPlanOptimizer extend
         // This set of rules push partition filter to LoadFunc
         s = new HashSet<Rule>();
         // Optimize partition filter
-        r = new PartitionFilterOptimizer("NewPartitionFilterOptimizer");
-        checkAndAddRule(s, r);
-        if (!s.isEmpty())
-            ls.add(s);
-
-        // Partition filter set
-        // This set of rules push partition filter to LoadFunc
-        s = new HashSet<Rule>();
-        // Optimize partition filter
         r = new PartitionFilterOptimizer("PartitionFilterOptimizer");
         checkAndAddRule(s, r);
         if (!s.isEmpty())

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java?rev=1546253&r1=1546252&r2=1546253&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java Thu Nov 28 01:29:43 2013
@@ -35,7 +35,6 @@ import org.apache.pig.newplan.FilterExtr
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.OperatorSubPlan;
-import org.apache.pig.newplan.PColFilterExtractor;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.newplan.logical.relational.LOFilter;
 import org.apache.pig.newplan.logical.relational.LOLoad;
@@ -96,43 +95,7 @@ public class PartitionFilterOptimizer ex
 
     @Override
     public Transformer getNewTransformer() {
-        if(name.equals("PartitionFilterOptimizer")) {
-            return new PartitionFilterPushDownTransformer();
-        } else {
-            return new NewPartitionFilterPushDownTransformer();
-        }
-    }
-
-    public class NewPartitionFilterPushDownTransformer extends PartitionFilterPushDownTransformer {
-        @Override
-        public void transform(OperatorPlan matched) throws FrontendException {
-            subPlan = new OperatorSubPlan( currentPlan );
-
-            setupColNameMaps();
-
-            FilterExtractor filterFinder = new FilterExtractor(
-                    loFilter.getFilterPlan(), getMappedKeys( partitionKeys ) );
-            filterFinder.visit();
-            Expression partitionFilter = filterFinder.getPColCondition();
-
-            if(partitionFilter != null) {
-                // the column names in the filter may be the ones provided by
-                // the user in the schema in the load statement - we may need
-                // to replace them with partition column names as given by
-                // LoadFunc.getSchema()
-                updateMappedColNames(partitionFilter);
-                try {
-                    loadMetadata.setPartitionFilter(partitionFilter);
-                } catch (IOException e) {
-                    throw new FrontendException( e );
-                }
-                if(filterFinder.isFilterRemovable()) {
-                    currentPlan.removeAndReconnect( loFilter );
-                } else {
-                    loFilter.setFilterPlan(filterFinder.getFilteredPlan());
-                }
-            }
-        }
+        return new PartitionFilterPushDownTransformer();
     }
 
     public class PartitionFilterPushDownTransformer extends Transformer {
@@ -182,33 +145,26 @@ public class PartitionFilterOptimizer ex
 
         	setupColNameMaps();
         	
-        	// PIG-1871: Don't throw exception if partition filters cannot be pushed up. 
-        	// Perform transformation on a copy of the filter plan, and replace the 
-        	// original filter plan only if the transformation is successful 
-        	// (i.e. partition filter can be pushed down) 
-        	LogicalExpressionPlan filterExpr = loFilter.getFilterPlan();
-        	LogicalExpressionPlan filterExprCopy = filterExpr.deepCopy();
-        	
-        	PColFilterExtractor pColFilterFinder = new PColFilterExtractor(
-        	        filterExprCopy, getMappedKeys( partitionKeys ) );
-        	pColFilterFinder.visit();
-        	Expression partitionFilter = pColFilterFinder.getPColCondition();
-        	
-        	if(partitionFilter != null) {
-        		// the column names in the filter may be the ones provided by
-        		// the user in the schema in the load statement - we may need
-        		// to replace them with partition column names as given by
-        		// LoadFunc.getSchema()
-        		updateMappedColNames(partitionFilter);
-        		try {
-					loadMetadata.setPartitionFilter(partitionFilter);
-				} catch (IOException e) {
-					throw new FrontendException( e );
-				}
-        		if(pColFilterFinder.isFilterRemovable()) {  
-        			currentPlan.removeAndReconnect( loFilter );
-        		} else {
-                    loFilter.setFilterPlan(filterExprCopy);
+        	FilterExtractor filterFinder = new FilterExtractor(
+                    loFilter.getFilterPlan(), getMappedKeys( partitionKeys ) );
+            filterFinder.visit();
+            Expression partitionFilter = filterFinder.getPColCondition();
+
+            if(partitionFilter != null) {
+                // the column names in the filter may be the ones provided by
+                // the user in the schema in the load statement - we may need
+                // to replace them with partition column names as given by
+                // LoadFunc.getSchema()
+                updateMappedColNames(partitionFilter);
+                try {
+                    loadMetadata.setPartitionFilter(partitionFilter);
+                } catch (IOException e) {
+                    throw new FrontendException( e );
+                }
+                if(filterFinder.isFilterRemovable()) {
+                    currentPlan.removeAndReconnect( loFilter );
+                } else {
+                    loFilter.setFilterPlan(filterFinder.getFilteredPlan());
                 }
             }
         }

Modified: pig/trunk/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java?rev=1546253&r1=1546252&r2=1546253&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java Thu Nov 28 01:29:43 2013
@@ -798,7 +798,7 @@ public class TestNewPartitionFilterPushD
 
             Set<Rule> s = new HashSet<Rule>();
             // add split filter rule
-            Rule r = new PartitionFilterOptimizer("NewPartitionFilterPushDown");
+            Rule r = new PartitionFilterOptimizer("PartitionFilterPushDown");
             s = new HashSet<Rule>();
             s.add(r);
             ls.add(s);

Modified: pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java?rev=1546253&r1=1546252&r2=1546253&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java Thu Nov 28 01:29:43 2013
@@ -1,995 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-import junit.framework.AssertionFailedError;
-
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.pig.ExecType;
-import org.apache.pig.Expression;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.LoadMetadata;
-import org.apache.pig.PigServer;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceStatistics;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.newplan.logical.expression.LogicalExpression;
-import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
-import org.apache.pig.newplan.logical.relational.LOFilter;
-import org.apache.pig.newplan.logical.relational.LogicalPlan;
-import org.apache.pig.newplan.logical.rules.PartitionFilterOptimizer;
-import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
-import org.apache.pig.newplan.Operator;
-import org.apache.pig.newplan.OperatorPlan;
-import org.apache.pig.newplan.OperatorSubPlan;
-import org.apache.pig.newplan.PColFilterExtractor;
-import org.apache.pig.newplan.optimizer.PlanOptimizer;
-import org.apache.pig.newplan.optimizer.Rule;
-import org.apache.pig.newplan.optimizer.Transformer;
-import org.apache.pig.parser.ParserException;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.util.LogUtils;
-import org.apache.pig.impl.util.Utils;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * unit tests to test extracting partition filter conditions out of the filter
- * condition in the filter following a load which talks to metadata system (.i.e.
- * implements {@link LoadMetadata})
- */
-@Deprecated
-public class TestPartitionFilterPushDown {
-    static PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
-    String query = "a = load 'foo' as (srcid:int, mrkt:chararray, dstid:int, name:chararray, " +
-    		"age:int, browser:map[], location:tuple(country:chararray, zip:int));";
-
-    @BeforeClass
-    public static void setup() throws Exception {
-    }
-
-    @AfterClass
-    public static void tearDown() {
-    }
-
-    /**
-     * test case where there is a single expression on partition columns in
-     * the filter expression along with an expression on non partition column
-     * @throws Exception
-     */
-    @Test
-    public void testSimpleMixed() throws Exception {
-        String q = query + "b = filter a by srcid == 10 and name == 'foo';" + "store b into 'out';";
-        test(q, Arrays.asList("srcid"), "(srcid == 10)", "(name == 'foo')");
-    }
-
-    /**
-     * test case where filter does not contain any condition on partition cols
-     * @throws Exception
-     */
-    @Test
-    public void testNoPartFilter() throws Exception {
-        String q = query + "b = filter a by age == 20 and name == 'foo';" + "store b into 'out';";
-        test(q, Arrays.asList("srcid"), null,
-        "((age == 20) and (name == 'foo'))");
-    }
-
-    /**
-     * test case where filter only contains condition on partition cols
-     * @throws Exception
-     */
-    @Test
-    public void testOnlyPartFilter1() throws Exception {
-        String q = query + "b = filter a by srcid > 20 and mrkt == 'us';" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "mrkt"),
-                "((srcid > 20) and (mrkt == 'us'))", null);
-
-    }
-
-    /**
-     * test case where filter only contains condition on partition cols
-     * @throws Exception
-     */
-    @Test
-    public void testOnlyPartFilter2() throws Exception {
-        String q = query + "b = filter a by mrkt == 'us';" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "mrkt"),
-                "(mrkt == 'us')", null);
-
-    }
-
-    /**
-     * test case where filter only contains condition on partition cols
-     * @throws Exception
-     */
-    @Test
-    public void testOnlyPartFilter3() throws Exception {
-        String q = query + "b = filter a by srcid == 20 or mrkt == 'us';" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "mrkt"),
-                "((srcid == 20) or (mrkt == 'us'))", null);
-
-    }
-
-    /**
-     * test case where filter has both conditions on partition cols and non
-     * partition cols and the filter condition will be split to extract the
-     * conditions on partition columns
-     */
-    @Test
-    public void testMixed1() throws Exception {
-        String q = query + "b = filter a by " +
-        "(age < 20 and  mrkt == 'us') and (srcid == 10 and " +
-        "name == 'foo');" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "mrkt"),
-                "((mrkt == 'us') and (srcid == 10))",
-        "((age < 20) and (name == 'foo'))");
-    }
-
-
-    /**
-     * test case where filter has both conditions on partition cols and non
-     * partition cols and the filter condition will be split to extract the
-     * conditions on partition columns
-     */
-    @Test
-    public void testMixed2() throws Exception {
-        String q = query + "b = filter a by " +
-        "(age >= 20 and  mrkt == 'us') and (srcid == 10 and " +
-        "dstid == 15);" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "dstid", "mrkt"),
-                "((mrkt == 'us') and ((srcid == 10) and (dstid == 15)))",
-        "(age >= 20)");
-    }
-
-    /**
-     * test case where filter has both conditions on partition cols and non
-     * partition cols and the filter condition will be split to extract the
-     * conditions on partition columns
-     */
-    @Test
-    public void testMixed3() throws Exception {
-        String q = query + "b = filter a by " +
-        "age >= 20 and  mrkt == 'us' and srcid == 10;" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "dstid", "mrkt"),
-                "((mrkt == 'us') and (srcid == 10))", "(age >= 20)");
-    }
-
-    /**
-     * test case where filter has both conditions on partition cols and non
-     * partition cols and the filter condition will be split to extract the
-     * conditions on partition columns - this testcase also has a condition
-     * based on comparison of two partition columns
-     */
-    @Test
-    public void testMixed4() throws Exception {
-        String q = query + "b = filter a by " +
-        "age >= 20 and  mrkt == 'us' and name == 'foo' and " +
-        "srcid == dstid;" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "dstid", "mrkt"),
-                "((mrkt == 'us') and (srcid == dstid))",
-        "((age >= 20) and (name == 'foo'))");
-    }
-
-    /**
-     * test case where filter has both conditions on partition cols and non
-     * partition cols and the filter condition will be split to extract the
-     * conditions on partition columns -
-     * This testcase has two partition col conditions  with OR +  non parition
-     * col conditions
-     */
-    @Test
-    public void testMixed5() throws Exception {
-        String q = query + "b = filter a by " +
-        "(srcid == 10 or mrkt == 'us') and name == 'foo' and " +
-        "dstid == 30;" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "dstid", "mrkt"),
-                "(((srcid == 10) or (mrkt == 'us')) and (dstid == 30))",
-        "(name == 'foo')");
-    }
-
-    /**
-     * test case where filter has both conditions on partition cols and non
-     * partition cols and the filter condition will be split to extract the
-     * conditions on partition columns -
-     * This testcase has two partition col conditions  with OR +  non parition
-     * col conditions
-     */
-    @Test
-    public void testMixed6() throws Exception {
-        String q = query + "b = filter a by " +
-        "dstid == 30 and (srcid == 10 or mrkt == 'us') and name == 'foo';" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "dstid", "mrkt"),
-                "((dstid == 30) and ((srcid == 10) or (mrkt == 'us')))",
-        "(name == 'foo')");
-    }
-
-    @Test
-    public void test7() throws Exception {
-        String query = "a = load 'foo' using " + TestLoader.class.getName() +
-            "('srcid, mrkt, dstid, name, age', 'srcid, name');" +
-            "b = filter a by (srcid < 20 and age < 30) or (name == 'foo' and age > 40);" +
-            "store b into 'output';";
-        LogicalPlan plan = buildPlan(new PigServer(pc), query);
-
-        Rule rule = new PartitionFilterOptimizer("test");
-        List<OperatorPlan> matches = rule.match(plan);
-        if (matches != null) {
-            Transformer transformer = rule.getNewTransformer();
-            for (OperatorPlan m : matches) {
-                if (transformer.check(m)) {
-                    transformer.transform(m);
-                }
-            }
-            OperatorSubPlan newPlan = (OperatorSubPlan)transformer.reportChanges();
-
-            Assert.assertTrue(newPlan.getBasePlan().isEqual(plan));
-        }
-
-    }
-
-    @Test
-    public void test8() throws Exception {
-        String query = "a = load 'foo' using " + TestLoader.class.getName() +
-            "('srcid, mrkt, dstid, name, age', 'srcid,name');" +
-            "b = filter a by (srcid < 20) or (name == 'foo');" +
-            "store b into 'output';";
-        LogicalPlan plan = Util.buildLp(new PigServer(pc), query);
-
-        Rule rule = new PartitionFilterOptimizer("test");
-        List<OperatorPlan> matches = rule.match(plan);
-        if (matches != null) {
-            Transformer transformer = rule.getNewTransformer();
-            for (OperatorPlan m : matches) {
-                if (transformer.check(m)) {
-                    transformer.transform(m);
-                }
-            }
-            OperatorSubPlan newPlan = (OperatorSubPlan)transformer.reportChanges();
-
-            Assert.assertTrue(newPlan.getBasePlan().size() == 3);
-        }
-
-    }
-
-
-    /**
-     * test case where filter has both conditions on partition cols and non
-     * partition cols and the filter condition will be split to extract the
-     * conditions on partition columns. This testcase also tests arithmetic
-     * in partition column conditions
-     */
-    @Test
-    public void testMixedArith() throws Exception {
-        String q = query + "b = filter a by " +
-        "mrkt == 'us' and srcid * 10 == 150 + 20 and age != 15;" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "dstid", "mrkt"),
-                "((mrkt == 'us') and ((srcid * 10) == (150 + 20)))",
-        "(age != 15)");
-    }
-
-    /**
-     * test case where there is a single expression on partition columns in the
-     * filter expression along with an expression on non partition column of
-     * type map
-     * @throws Exception
-     */
-    @Test
-    public void testMixedNonPartitionTypeMap() throws Exception {
-        String q = query + "b = filter a by srcid == 10 and browser#'type' == 'IE';" +
-                "store b into 'out';";
-        test(q, Arrays.asList("srcid"), "(srcid == 10)");
-
-        q = query + "b = filter a by srcid == 10 and browser#'type' == 'IE' and " +
-                "browser#'version'#'major' == '8.0';" + "store b into 'out';";
-        test(q, Arrays.asList("srcid"), "(srcid == 10)");
-
-        // Some complex partition filters with a non-partition filter
-        q = query + "b = filter a by srcid == 10 and mrkt > '1' and mrkt < '5';" +
-                "c = filter b by browser#'type' == 'IE';" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "mrkt"),
-                "(((srcid == 10) and (mrkt > '1')) and (mrkt < '5'))");
-
-        q = query + "b = filter a by srcid == 10 and (mrkt > '1' and mrkt < '5');" +
-                "c = filter b by browser#'type' == 'IE';" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "mrkt"),
-                "((srcid == 10) and ((mrkt > '1') and (mrkt < '5')))");
-    }
-
-    /**
-     * test case where there is a single expression on partition columns in the
-     * filter expression along with an expression on non partition column of
-     * type tuple
-     * @throws Exception
-     */
-    @Test
-    public void testMixedNonPartitionTypeTuple() throws Exception {
-        String q = query + "b = filter a by srcid == 10 and location.country == 'US';" +
-                "store b into 'out';";
-        test(q, Arrays.asList("srcid"), "(srcid == 10)");
-    }
-
-    @Test
-    public void testAndORConditionPartitionKeyCol() throws Exception {
-        // Case of AND and OR
-        String q = query + "b = filter a by (srcid == 10 and dstid == 5) " +
-                "or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7);" +
-                "store b into 'out';";
-        test(q, Arrays.asList("srcid", "dstid"),
-                "((((srcid == 10) and (dstid == 5)) " +
-                "or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))",
-                null);
-
-        // Additional filter on non-partition key column
-        q = query +
-                "b = filter a by ((srcid == 10 and dstid == 5) " +
-                "or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7)) and mrkt == 'US';" +
-                "store b into 'out';";
-        test(q, Arrays.asList("srcid", "dstid"),
-                "((((srcid == 10) and (dstid == 5)) " +
-                "or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))",
-                "(mrkt == 'US')");
-
-        // partition key col but null condition which should not become part of
-        // the pushed down filter
-        q = query + "b = filter a by (srcid is null and dstid == 5) " +
-                "or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7);" +
-                "store b into 'out';";
-        test(q, Arrays.asList("srcid", "dstid"), null);
-
-        // Case of OR and AND
-        q = query +
-                "b = filter a by (mrkt == 'US' or mrkt == 'UK') and (srcid == 11 or srcid == 10);" +
-                "store b into 'out';";
-        test(q, Arrays.asList("srcid", "mrkt"),
-                "(((mrkt == 'US') or (mrkt == 'UK')) and ((srcid == 11) or (srcid == 10)))", null);
-        q = query +
-                "b = filter a by (mrkt == 'US' or mrkt == 'UK') and (srcid == 11 or srcid == 10) and dstid == 10;" +
-                "store b into 'out';";
-        test(q, Arrays.asList("srcid", "mrkt"),
-                "(((mrkt == 'US') or (mrkt == 'UK')) and ((srcid == 11) or (srcid == 10)))",
-                "(dstid == 10)");
-    }
-
-    @Test
-    public void testAndORConditionMixedCol() throws Exception {
-        // Case of AND and OR with partition key and non-partition key columns
-        String q = query + "b = filter a by (srcid == 10 and dstid == 5) " +
-                "or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7) " +
-                "or (srcid == 13 and dstid == 8);" +
-                "store b into 'out';";
-        test(q, Arrays.asList("srcid"), null,
-                "(((((srcid == 10) and (dstid == 5)) " +
-                "or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7))) " +
-                "or ((srcid == 13) and (dstid == 8)))");
-
-        // Additional filter on a partition key column
-        q = query +
-                "b = filter a by ((srcid == 10 and dstid == 5) or (srcid == 11 and dstid == 6) " +
-                "or (srcid == 12 and dstid == 7)) and mrkt == 'US';" +
-                "store b into 'out';";
-        test(q, Arrays.asList("srcid", "mrkt"), "(mrkt == 'US')",
-                "((((srcid == 10) and (dstid == 5)) or ((srcid == 11) and (dstid == 6))) " +
-                "or ((srcid == 12) and (dstid == 7)))");
-
-        q = query + "b = filter a by (mrkt == 'US' or mrkt == 'UK') and " +
-                "((srcid == 10 and dstid == 5) or (srcid == 11 and dstid == 6) " +
-                "or (srcid == 12 and dstid == 7));" +
-                "store b into 'out';";
-        test(q, Arrays.asList("srcid", "mrkt"), "((mrkt == 'US') or (mrkt == 'UK'))",
-                "((((srcid == 10) and (dstid == 5)) or ((srcid == 11) and (dstid == 6))) " +
-                "or ((srcid == 12) and (dstid == 7)))");
-
-        // Additional filter on a non-partition key column
-        q = query +
-                "b = filter a by ((srcid == 10 and dstid == 5) or (srcid == 11 and dstid == 6) " +
-                "or (srcid == 12 and dstid == 7)) and mrkt == 'US';" +
-                "store b into 'out';";
-        test(q, Arrays.asList("srcid"), null,
-                "(((((srcid == 10) and (dstid == 5)) or ((srcid == 11) and (dstid == 6))) " +
-                "or ((srcid == 12) and (dstid == 7))) and (mrkt == 'US'))");
-
-        // Case of OR and AND
-        q = query +
-                "b = filter a by (mrkt == 'US' or mrkt == 'UK') and " +
-                "(srcid == 11 or srcid == 10) and (dstid == 5 or dstid == 6);" +
-                "store b into 'out';";
-        test(q, Arrays.asList("srcid"),
-                "((srcid == 11) or (srcid == 10))",
-                "(((mrkt == 'US') or (mrkt == 'UK')) and ((dstid == 5) or (dstid == 6)))");
-        test(q, Arrays.asList("mrkt"),
-                "((mrkt == 'US') or (mrkt == 'UK'))",
-                "(((srcid == 11) or (srcid == 10)) and ((dstid == 5) or (dstid == 6)))");
-    }
-
-    public void testNegPColConditionWithNonPCol() throws Exception {
-        // use of partition column condition and non partition column in
-        // same condition should fail
-        String q = query + "b = filter a by " +
-        "srcid > age;" + "store b into 'out';";
-        negativeTest(q, Arrays.asList("srcid"), 1111);
-        q = query + "b = filter a by " +
-        "srcid + age == 20;" + "store b into 'out';";
-        negativeTest(q, Arrays.asList("srcid"), 1111);
-
-        // OR of partition column condition and non partiton col condition
-        // should fail
-        q = query + "b = filter a by " +
-        "srcid > 10 or name == 'foo';" +
-        "store b into 'out';";
-        negativeTest(q, Arrays.asList("srcid"), 1111);
-    }
-
-    @Test
-    public void testNegPColInWrongPlaces() throws Exception {
-        int expectedErrCode = 1112;
-
-        String q = query + "b = filter a by " +
-        "(srcid > 10 and name == 'foo') or dstid == 10;" + "store b into 'out';";
-        negativeTest(q, Arrays.asList("srcid", "dstid"), expectedErrCode);
-
-        expectedErrCode = 1110;
-        q = query + "b = filter a by " +
-        "CONCAT(mrkt, '_10') == 'US_10' and age == 20;" + "store b into 'out';";
-        negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
-
-        q = query + "b = filter a by " +
-        "mrkt matches '.*us.*' and age < 15;" + "store b into 'out';";
-        negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
-
-        q = query + "b = filter a by " +
-        "(int)mrkt == 10 and name matches '.*foo.*';" + "store b into 'out';";
-        negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"),expectedErrCode);
-
-        q = query + "b = filter a by " +
-        "(mrkt == 'us' ? age : age + 10) == 40 and name matches '.*foo.*';" + "store b into 'out';";
-        negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
-
-        q = query + "b = filter a by " +
-        "(mrkt is null) and name matches '.*foo.*';" + "store b into 'out';";
-        negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
-
-        q = query + "b = filter a by " +
-        "(mrkt is not null) and name matches '.*foo.*';" + "store b into 'out';";
-        negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
-    }
-
-//    @Test
-//    public void testNegPColInWrongPlaces2() throws Exception {
-//
-//        LogicalPlanTester tester = new LogicalPlanTester(pc);
-//        tester.buildPlan("a = load 'foo' using " + TestLoader.class.getName()
-//                + "('srcid, mrkt, dstid, name, age', 'srcid,dstid,mrkt');");
-//
-//        org.apache.pig.impl.logicalLayer.LogicalPlan lp = tester
-//                .buildPlan("b = filter a by "
-//                        + "(srcid > 10 and name == 'foo') or dstid == 10;");
-//        negativeTest(lp);
-//
-//        lp = tester.buildPlan("b = filter a by " +
-//                "CONCAT(mrkt, '_10') == 'US_10' and age == 20;");
-//        negativeTest(lp);
-//
-//        lp = tester.buildPlan("b = filter a by " +
-//                "mrkt matches '.*us.*' and age < 15;");
-//        negativeTest(lp);
-//
-//        lp = tester.buildPlan("b = filter a by " +
-//                "(int)mrkt == 10 and name matches '.*foo.*';");
-//        negativeTest(lp);
-//
-//        lp = tester.buildPlan("b = filter a by " +
-//            "(mrkt == 'us' ? age : age + 10) == 40 and name matches '.*foo.*';");
-//        negativeTest(lp);
-//
-//        lp = tester.buildPlan("b = filter a by " +
-//            "(mrkt is null) and name matches '.*foo.*';");
-//        negativeTest(lp);
-//
-//        lp = tester.buildPlan("b = filter a by " +
-//            "(mrkt is not null) and name matches '.*foo.*';");
-//        negativeTest(lp);
-//    }
-
-
-    /**
-     * Test that pig sends correct partition column names in setPartitionFilter
-     * when the user has a schema in the load statement which renames partition
-     * columns
-     * @throws Exception
-     */
-    @Test
-    public void testColNameMapping1() throws Exception {
-        TestLoader.partFilter = null;
-        String q = "a = load 'foo' using "
-            + TestLoader.class.getName() +
-            "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
-            "'srcid,mrkt') as (f1, f2, f3, f4, f5);" +
-            "b = filter a by " +
-            "(f5 >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);" +
-            "store b into 'out';";
-
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
-
-        Assert.assertEquals("checking partition filter:",
-                "((mrkt == 'us') and (srcid == 10))",
-                TestLoader.partFilter.toString());
-        Operator op = newLogicalPlan.getSinks().get(0);
-        LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
-
-        PColFilterExtractor extractor = new PColFilterExtractor(filter.getFilterPlan(), new ArrayList<String>());
-
-        String actual = extractor.getExpression(
-                (LogicalExpression)filter.getFilterPlan().getSources().get(0)).
-                toString().toLowerCase();
-        Assert.assertEquals("checking trimmed filter expression:",
-                "((f5 >= 20) and (f3 == 15))", actual);
-    }
-
-    private LogicalPlan migrateAndOptimizePlan(String query) throws Exception {
-        PigServer pigServer = new PigServer( pc );
-        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-        PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
-        optimizer.optimize();
-        return newLogicalPlan;
-    }
-
-
-    /**
-     * Test that pig sends correct partition column names in setPartitionFilter
-     * when the user has a schema in the load statement which renames partition
-     * columns - in this test case there is no condition on partition columns
-     * - so setPartitionFilter() should not be called and the filter condition
-     * should remain as is.
-     * @throws Exception
-     */
-    @Test
-    public void testColNameMapping2() throws Exception {
-        TestLoader.partFilter = null;
-        String q = "a = load 'foo' using "
-            + TestLoader.class.getName() +
-            "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
-            "'srcid') as (f1, f2, f3, f4, f5);" +
-            "b = filter a by " +
-            "f5 >= 20 and f2 == 'us' and f3 == 15;" +
-            "store b into 'out';";
-
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
-
-        Assert.assertEquals("checking partition filter:",
-                null,
-                TestLoader.partFilter);
-        Operator op = newLogicalPlan.getSinks().get(0);
-        LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
-
-        PColFilterExtractor extractor = new PColFilterExtractor(filter.getFilterPlan(), new ArrayList<String>());
-
-        String actual = extractor.getExpression(
-                (LogicalExpression) filter.getFilterPlan().
-                getSources().get(0)).
-                toString().toLowerCase();
-        Assert.assertEquals("checking trimmed filter expression:",
-                "(((f5 >= 20) and (f2 == 'us')) and (f3 == 15))", actual);
-    }
-
-    /**
-     * Test that pig sends correct partition column names in setPartitionFilter
-     * when the user has a schema in the load statement which renames partition
-     * columns - in this test case the filter only has conditions on partition
-     * columns
-     * @throws Exception
-     */
-    @Test
-    public void testColNameMapping3() throws Exception {
-        TestLoader.partFilter = null;
-        String query = "a = load 'foo' using "
-            + TestLoader.class.getName() +
-            "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
-            "'srcid,mrkt,dstid,age') as (f1, f2, f3, f4, f5);" +
-            "b = filter a by " +
-            "(f5 >= 20 or f2 == 'us') and (f1 == 10 and f3 == 15);" +
-            "store b into 'out';";
-
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
-        Assert.assertEquals("checking partition filter:",
-                "(((age >= 20) or (mrkt == 'us')) and ((srcid == 10) and " +
-                "(dstid == 15)))",
-                TestLoader.partFilter.toString());
-        Iterator<Operator> it = newLogicalPlan.getOperators();
-        while( it.hasNext() ) {
-	        Assert.assertFalse("Checking that filter has been removed since it contained" +
-	                " only conditions on partition cols:",
-	                (it.next() instanceof LOFilter));
-        }
-    }
-
-    /**
-     * Test that pig sends correct partition column names in setPartitionFilter
-     * when the user has a schema in the load statement which renames partition
-     * columns - in this test case the schema in load statement is a prefix
-     * (with columns renamed) of the schema returned by
-     * {@link LoadMetadata#getSchema(String, Configuration)}
-     * @throws Exception
-     */
-    @Test
-    public void testColNameMapping4() throws Exception {
-        TestLoader.partFilter = null;
-        String q = "a = load 'foo' using "
-            + TestLoader.class.getName() +
-            "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
-            "'srcid,mrkt') as (f1:int, f2:chararray, f3:int, name:chararray, age:int);" +
-            "b = filter a by " +
-            "(age >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);" + "store b into 'out';";
-
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
-
-        Assert.assertEquals("checking partition filter:",
-                "((mrkt == 'us') and (srcid == 10))",
-                TestLoader.partFilter.toString());
-        Operator op = newLogicalPlan.getSinks().get(0);
-        LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
-
-        PColFilterExtractor extractor = new PColFilterExtractor(filter.getFilterPlan(), new ArrayList<String>());
-
-        String actual = extractor.getExpression(
-                (LogicalExpression) filter.getFilterPlan().getSources().get(0)).
-                toString().toLowerCase();
-        Assert.assertEquals("checking trimmed filter expression:",
-                "((age >= 20) and (f3 == 15))", actual);
-    }
-
-    /**
-     * Test PIG-1267
-     * @throws Exception
-     */
-    @Test
-    public void testColNameMapping5() throws Exception {
-        TestLoader.partFilter = null;
-        String q = "a = load 'foo' using "
-            + TestLoader.class.getName() +
-            "('mrkt:chararray, a1:chararray, a2:chararray, srcid:int, bcookie:chararray', " +
-            "'srcid');" +
-            "b = load 'bar' using "
-            + TestLoader.class.getName() +
-            "('dstid:int, b1:int, b2:int, srcid:int, bcookie:chararray, mrkt:chararray'," +
-            "'srcid');" +
-            "a1 = filter a by srcid == 10;" +
-            "b1 = filter b by srcid == 20;"+
-            "c = join a1 by bcookie, b1 by bcookie;" +
-            "d = foreach c generate $4 as bcookie:chararray, " +
-            "$5 as dstid:int, $0 as mrkt:chararray;" +
-            "store d into 'out';";
-
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
-
-        String partFilter = TestLoader.partFilter.toString();
-        Assert.assertTrue( "(srcid == 20)".equals( partFilter ) ||  "(srcid == 10)".equals( partFilter ) );
-
-        int counter = 0;
-        Iterator<Operator> iter = newLogicalPlan.getOperators();
-        while (iter.hasNext()) {
-            Assert.assertTrue(!(iter.next() instanceof LOFilter));
-            counter++;
-        }
-        Assert.assertEquals(counter, 5);
-    }
-
-    /**
-     * Test PIG-2778 Add matches operator to predicate pushdown
-     * @throws Exception
-     */
-    @Test
-    public void testMatchOpPushDown() throws Exception {
-        // regexp condition on a partition col
-        String q = query + "b = filter a by name matches 'foo*';" + "store b into 'out';";
-        test(q, Arrays.asList("name"), "(name matches 'foo*')", null);
-
-        // regexp condition on a non-partition col
-        q = query + "b = filter a by name matches 'foo*';" + "store b into 'out';";
-        test(q, Arrays.asList("srcid"), null, "(name matches 'foo*')");
-    }
-
-    /**
-     * Test PIG-3395 Large filter expression makes Pig hang
-     * @throws Exception
-     */
-    @Test
-    public void testLargeAndOrCondition() throws Exception {
-        String q = query + "b = filter a by " +
-                "(srcid == 1 and mrkt == '2' and dstid == 3) " +
-                "or (srcid == 4 and mrkt == '5' and dstid == 6) " +
-                "or (srcid == 7 and mrkt == '8' and dstid == 9) " +
-                "or (srcid == 10 and mrkt == '11' and dstid == 12) " +
-                "or (srcid == 13 and mrkt == '14' and dstid == 15) " +
-                "or (srcid == 16 and mrkt == '17' and dstid == 18) " +
-                "or (srcid == 19 and mrkt == '20' and dstid == 21) " +
-                "or (srcid == 22 and mrkt == '23' and dstid == 24) " +
-                "or (srcid == 25 and mrkt == '26' and dstid == 27) " +
-                "or (srcid == 28 and mrkt == '29' and dstid == 30) " +
-                "or (srcid == 31 and mrkt == '32' and dstid == 33) " +
-                "or (srcid == 34 and mrkt == '35' and dstid == 36) " +
-                "or (srcid == 37 and mrkt == '38' and dstid == 39) " +
-                "or (srcid == 40 and mrkt == '41' and dstid == 42) " +
-                "or (srcid == 43 and mrkt == '44' and dstid == 45) " +
-                "or (srcid == 46 and mrkt == '47' and dstid == 48) " +
-                "or (srcid == 49 and mrkt == '50' and dstid == 51) " +
-                "or (srcid == 52 and mrkt == '53' and dstid == 54) " +
-                "or (srcid == 55 and mrkt == '56' and dstid == 57) " +
-                "or (srcid == 58 and mrkt == '59' and dstid == 60) " +
-                "or (srcid == 61 and mrkt == '62' and dstid == 63) " +
-                "or (srcid == 64 and mrkt == '65' and dstid == 66) " +
-                "or (srcid == 67 and mrkt == '68' and dstid == 69);" +
-                "store b into 'out';";
-        test(q, Arrays.asList("srcid", "mrkt", "dstid"),
-                "(((((((((((((((((((((((((srcid == 1) and (mrkt == '2')) and (dstid == 3)) " +
-                "or (((srcid == 4) and (mrkt == '5')) and (dstid == 6))) " +
-                "or (((srcid == 7) and (mrkt == '8')) and (dstid == 9))) " +
-                "or (((srcid == 10) and (mrkt == '11')) and (dstid == 12))) " +
-                "or (((srcid == 13) and (mrkt == '14')) and (dstid == 15))) " +
-                "or (((srcid == 16) and (mrkt == '17')) and (dstid == 18))) " +
-                "or (((srcid == 19) and (mrkt == '20')) and (dstid == 21))) " +
-                "or (((srcid == 22) and (mrkt == '23')) and (dstid == 24))) " +
-                "or (((srcid == 25) and (mrkt == '26')) and (dstid == 27))) " +
-                "or (((srcid == 28) and (mrkt == '29')) and (dstid == 30))) " +
-                "or (((srcid == 31) and (mrkt == '32')) and (dstid == 33))) " +
-                "or (((srcid == 34) and (mrkt == '35')) and (dstid == 36))) " +
-                "or (((srcid == 37) and (mrkt == '38')) and (dstid == 39))) " +
-                "or (((srcid == 40) and (mrkt == '41')) and (dstid == 42))) " +
-                "or (((srcid == 43) and (mrkt == '44')) and (dstid == 45))) " +
-                "or (((srcid == 46) and (mrkt == '47')) and (dstid == 48))) " +
-                "or (((srcid == 49) and (mrkt == '50')) and (dstid == 51))) " +
-                "or (((srcid == 52) and (mrkt == '53')) and (dstid == 54))) " +
-                "or (((srcid == 55) and (mrkt == '56')) and (dstid == 57))) " +
-                "or (((srcid == 58) and (mrkt == '59')) and (dstid == 60))) " +
-                "or (((srcid == 61) and (mrkt == '62')) and (dstid == 63))) " +
-                "or (((srcid == 64) and (mrkt == '65')) and (dstid == 66))) " +
-                "or (((srcid == 67) and (mrkt == '68')) and (dstid == 69)))",
-                null);
-    }
-
-    // UDF expression should make the entire filter be rejected
-    @Test
-    public void testAndOrConditionMixedWithUdfExpr() throws Exception {
-        String q = query + "b = filter a by " +
-                "(UPPER(name) == 'FOO')" +
-                "or (srcid == 1 and mrkt == '2' and dstid == 3) " +
-                "or (srcid == 4 and mrkt == '5' and dstid == 6) " +
-                "or (srcid == 7 and mrkt == '8' and dstid == 9);" +
-                "store b into 'out';";
-        negativeTest(q, Arrays.asList("srcid", "mrkt", "dstid"));
-    }
-
-    // Cast expression should make the entire filter be rejected
-    @Test
-    public void testAndOrConditionMixedWithCastExpr() throws Exception {
-        String q = query + "b = filter a by " +
-                "(srcid == 1 and mrkt == '2' and dstid == 3) " +
-                "or (srcid == 4 and (int)mrkt == 5 and dstid == 6) " +
-                "or (srcid == 7 and mrkt == '8' and dstid == 9);" +
-                "store b into 'out';";
-        negativeTest(q, Arrays.asList("srcid", "mrkt", "dstid"));
-    }
-
-    // Null expression should make the entire filter be rejected
-    @Test
-    public void testAndOrConditionMixedWithNullExpr() throws Exception {
-        String q = query + "b = filter a by " +
-                "(srcid == 1 and mrkt == '2' and dstid == 3) " +
-                "or (srcid == 4 and mrkt == '5' and dstid == 6) " +
-                "or (srcid == 7 and mrkt == '8' and dstid == 9) " +
-                "or (name is null);" +
-                "store b into 'out';";
-        negativeTest(q, Arrays.asList("srcid", "mrkt", "dstid"));
-    }
-
-    //// helper methods ///////
-
-    private PColFilterExtractor test(String query, List<String> partitionCols,
-            String expPartFilterString) throws Exception {
-        return test(query, partitionCols, expPartFilterString, null, true);
-    }
-
-    private PColFilterExtractor test(String query, List<String> partitionCols,
-            String expPartFilterString, String expFilterString) throws Exception {
-        return test(query, partitionCols, expPartFilterString, expFilterString, false);
-    }
-
-    private PColFilterExtractor test(String query, List<String> partitionCols,
-            String expPartFilterString, String expFilterString, boolean skipTrimmedFilterCheck)
-    throws Exception {
-        PigServer pigServer = new PigServer( pc );
-        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-        Operator op = newLogicalPlan.getSinks().get(0);
-        LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
-        PColFilterExtractor pColExtractor = new PColFilterExtractor(
-                filter.getFilterPlan(), partitionCols);
-        pColExtractor.visit();
-
-        if(expPartFilterString == null) {
-            Assert.assertEquals("Checking partition column filter:", null,
-                    pColExtractor.getPColCondition());
-        } else  {
-            Assert.assertEquals("Checking partition column filter:",
-                    expPartFilterString,
-                    pColExtractor.getPColCondition().toString());
-        }
-
-        // The getExpression() in PColFilterExtractor was written to get the
-        // pushdown filter expression and does not have support for columns of
-        // type tuple or map as partition columns are expected to be of
-        // primitive data type. But we are using the method in the tests for forming
-        // trimmed filter after pushdown. So skip check in cases where we expect a
-        // trimmed filter to have a map or tuple in the condition.
-        if (!skipTrimmedFilterCheck) {
-            if (expFilterString == null) {
-                Assert.assertTrue("Check that filter can be removed:",
-                        pColExtractor.isFilterRemovable());
-            } else {
-                String actual = pColExtractor
-                        .getExpression(
-                                (LogicalExpression) filter.getFilterPlan().getSources().get(0))
-                        .toString();
-                Assert.assertEquals("checking trimmed filter expression:", expFilterString, actual);
-            }
-        }
-        return pColExtractor;
-    }
-
-    // The filter cannot be pushed down unless it meets certain conditions. In
-    // that case, PColExtractor.getPColCondition() should return null.
-    private void negativeTest(String query, List<String> partitionCols) throws Exception {
-        PigServer pigServer = new PigServer( pc );
-        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-        Operator op = newLogicalPlan.getSinks().get(0);
-        LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
-        PColFilterExtractor pColExtractor = new PColFilterExtractor(
-                filter.getFilterPlan(), partitionCols);
-        pColExtractor.visit();
-        Assert.assertFalse(pColExtractor.canPushDown());
-        Assert.assertNull(pColExtractor.getPColCondition());
-    }
-
-    private void negativeTest(String query, List<String> partitionCols,
-            int expectedErrorCode) throws Exception {
-        PigServer pigServer = new PigServer( pc );
-        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-        Operator op = newLogicalPlan.getSinks().get(0);
-        LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
-        PColFilterExtractor pColExtractor = new PColFilterExtractor(
-                filter.getFilterPlan(), partitionCols);
-        try {
-            pColExtractor.visit();
-        } catch(Exception e) {
-            Assert.assertEquals("Checking if exception has right error code",
-                    expectedErrorCode, LogUtils.getPigException(e).getErrorCode());
-            return;
-        }
-    }
-
-    /**
-     * this loader is only used to test that parition column filters are given
-     * in the manner expected in terms of column names - hence it does not
-     * implement many of the methods and only implements required ones.
-     */
-    public static class TestLoader extends LoadFunc implements LoadMetadata {
-
-        Schema schema;
-        String[] partCols;
-        static Expression partFilter = null;
-
-        public TestLoader(String schemaString, String commaSepPartitionCols)
-        throws ParserException {
-            schema = Utils.getSchemaFromString(schemaString);
-            partCols = commaSepPartitionCols.split(",");
-        }
-
-        @Override
-        public InputFormat getInputFormat() throws IOException {
-            return null;
-        }
-
-        @Override
-        public Tuple getNext() throws IOException {
-            return null;
-        }
-
-        @Override
-        public void prepareToRead(RecordReader reader, PigSplit split)
-        throws IOException {
-        }
-
-        @Override
-        public void setLocation(String location, Job job) throws IOException {
-        }
-
-        @Override
-        public String[] getPartitionKeys(String location, Job job)
-        throws IOException {
-            return partCols;
-        }
-
-        @Override
-        public ResourceSchema getSchema(String location, Job job)
-        throws IOException {
-            return new ResourceSchema(schema);
-        }
-
-        @Override
-        public ResourceStatistics getStatistics(String location,
-                Job job) throws IOException {
-            return null;
-        }
-
-        @Override
-        public void setPartitionFilter(Expression partitionFilter)
-        throws IOException {
-            partFilter = partitionFilter;
-        }
-
-    }
-
-    public class MyPlanOptimizer extends LogicalPlanOptimizer {
-        protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
-            super( p, iterations, new HashSet<String>() );
-        }
-
-        protected List<Set<Rule>> buildRuleSets() {
-            List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
-
-            Set<Rule> s = new HashSet<Rule>();
-            // add split filter rule
-            Rule r = new PartitionFilterOptimizer("PartitionFilterPushDown");
-            s = new HashSet<Rule>();
-            s.add(r);
-            ls.add(s);
-
-            r = new LoadTypeCastInserter( "LoadTypeCastInserter" );
-            s = new HashSet<Rule>();
-            s.add(r);
-            ls.add(s);
-            return ls;
-        }
-    }
-
-    // Helper Functions
-    public LogicalPlan buildPlan(PigServer pigServer, String query) throws Exception {
-    	try {
-            return Util.buildLp(pigServer, query);
-    	} catch(Throwable t) {
-    		throw new AssertionFailedError(t.getMessage());
-    	}
-    }
-
-}