You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by an...@apache.org on 2013/09/24 20:59:54 UTC

svn commit: r1525970 - in /pig/trunk: ./ conf/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/newplan/ src/org/apache/pig/newplan/logical/optimizer/ src/org/apache/pig/newplan/logical/rules/ test/org/apache/pi...

Author: aniket486
Date: Tue Sep 24 18:59:53 2013
New Revision: 1525970

URL: http://svn.apache.org/r1525970
Log:
PIG-3461: Rewrite PartitionFilterOptimizer to make it work for all the cases (aniket486)

Added:
    pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java
    pig/trunk/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/conf/pig.properties
    pig/trunk/src/org/apache/pig/PigConstants.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
    pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1525970&r1=1525969&r2=1525970&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Sep 24 18:59:53 2013
@@ -40,6 +40,8 @@ PIG-3174: Remove rpm and deb artifacts f
 
 IMPROVEMENTS
 
+PIG-3461: Rewrite PartitionFilterOptimizer to make it work for all the cases (aniket486)
+
 PIG-2417: Streaming UDFs - allow users to easily write UDFs in scripting languages with no
           JVM implementation. (jeremykarn via daijy)
 

Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1525970&r1=1525969&r2=1525970&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Tue Sep 24 18:59:53 2013
@@ -216,3 +216,7 @@ pig.location.check.strict=false
 # This option is used to define whether to support recovery to handle the
 # application master getting restarted.
 # pig.output.committer.recovery.support=true
+
+# Set this option to true if you need to use the old partition filter optimizer. 
+# Note: Old filter optimizer PColFilterOptimizer will be deprecated in the future.
+# pig.exec.useOldPartitionFilterOptimize=true

Modified: pig/trunk/src/org/apache/pig/PigConstants.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConstants.java?rev=1525970&r1=1525969&r2=1525970&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConstants.java (original)
+++ pig/trunk/src/org/apache/pig/PigConstants.java Tue Sep 24 18:59:53 2013
@@ -46,4 +46,11 @@ public class PigConstants {
      * by default, all rules are enabled.
      */
     public static final String PIG_OPTIMIZER_RULES_DISABLED_KEY = "pig.optimizer.rules.disabled";
+
+    /**
+     * flag to use old PartitionFilterOptimizer in case NewPartitionFilterOptimizer is not backwards compatible
+     * (A known case is "filter a by 1 == 0").
+     */
+    public static final String PIG_EXEC_OLD_PART_FILTER_OPTIMIZER = "pig.exec.useOldPartitionFilterOptimizer";
+
 }
\ No newline at end of file

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1525970&r1=1525969&r2=1525970&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Tue Sep 24 18:59:53 2013
@@ -260,6 +260,14 @@ public abstract class HExecutionEngine i
             disabledOptimizerRules = new HashSet<String>();
         }
 
+        if( ! Boolean.valueOf(this.pigContext.getProperties().getProperty(
+                PigConstants.PIG_EXEC_OLD_PART_FILTER_OPTIMIZER, "false"))){
+            // Turn off the old partition filter optimizer
+            disabledOptimizerRules.add("PartitionFilterOptimizer");
+        } else {
+            disabledOptimizerRules.add("NewPartitionFilterOptimizer");
+        }
+
         String pigOptimizerRulesDisabled = this.pigContext.getProperties()
                 .getProperty(PigConstants.PIG_OPTIMIZER_RULES_DISABLED_KEY);
         if (pigOptimizerRulesDisabled != null) {
@@ -270,6 +278,7 @@ public abstract class HExecutionEngine i
         if (pigContext.inIllustrator) {
             disabledOptimizerRules.add("MergeForEach");
             disabledOptimizerRules.add("PartitionFilterOptimizer");
+            disabledOptimizerRules.add("NewPartitionFilterOptimizer");
             disabledOptimizerRules.add("LimitOptimizer");
             disabledOptimizerRules.add("SplitFilter");
             disabledOptimizerRules.add("PushUpFilter");

Added: pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java?rev=1525970&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java Tue Sep 24 18:59:53 2013
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.Expression;
+import org.apache.pig.Expression.OpType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.logical.expression.AddExpression;
+import org.apache.pig.newplan.logical.expression.AndExpression;
+import org.apache.pig.newplan.logical.expression.BinaryExpression;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.DivideExpression;
+import org.apache.pig.newplan.logical.expression.EqualExpression;
+import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression;
+import org.apache.pig.newplan.logical.expression.GreaterThanExpression;
+import org.apache.pig.newplan.logical.expression.LessThanEqualExpression;
+import org.apache.pig.newplan.logical.expression.LessThanExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ModExpression;
+import org.apache.pig.newplan.logical.expression.MultiplyExpression;
+import org.apache.pig.newplan.logical.expression.NotEqualExpression;
+import org.apache.pig.newplan.logical.expression.OrExpression;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.RegexExpression;
+import org.apache.pig.newplan.logical.expression.SubtractExpression;
+
+/**
+ * This is a rewrite of {@code PColFilterExtractor}
+ *
+ * We traverse the expression plan bottom up and separate it into two plans
+ * - pushdownExprPlan, plan that can be pushed down to the loader and
+ * - filterExprPlan, remaining plan that needs to be evaluated by pig
+ *
+ */
+public class FilterExtractor {
+
+    private static final Log LOG = LogFactory.getLog(FilterExtractor.class);
+
+    /**
+     * partition columns associated with the table
+     * present in the load on which the filter whose
+     * inner plan is being visited is applied
+     */
+    private List<String> partitionCols;
+
+    /**
+     * We visit this plan to create the filteredPlan
+     */
+    protected LogicalExpressionPlan originalPlan;
+
+    /**
+     * Plan that is created after all pushable filters are removed
+     */
+    protected LogicalExpressionPlan filteredPlan;
+
+    /**
+     * Plan that can be pushed down
+     */
+    protected LogicalExpressionPlan pushdownExprPlan;
+
+    /**
+     * Final filterExpr after we are done
+     */
+    private LogicalExpression filterExpr = null;
+
+    /**
+     * @{code Expression} to pushdown
+     */
+    private Expression pushdownExpr = null;
+
+    /**
+     *
+     * @param plan logical plan corresponding the filter's comparison condition
+     * @param partitionCols list of partition columns of the table which is
+     * being loaded in the LOAD statement which is input to the filter
+     */
+    public FilterExtractor(LogicalExpressionPlan plan,
+            List<String> partitionCols) {
+        this.originalPlan = plan;
+        this.partitionCols = new ArrayList<String>(partitionCols);
+        this.filteredPlan = new LogicalExpressionPlan();
+        this.pushdownExprPlan = new LogicalExpressionPlan();
+    }
+
+    public void visit() throws FrontendException {
+        // we will visit the leaf and it will recursively walk the plan
+        LogicalExpression leaf = (LogicalExpression)originalPlan.getSources().get( 0 );
+        // if the leaf is a unary operator it should be a FilterFunc in
+        // which case we don't try to extract partition filter conditions
+        if(leaf instanceof BinaryExpression) {
+            // recursively traverse the tree bottom up
+            // checkPushdown returns KeyState which is pair of LogicalExpression
+            BinaryExpression binExpr = (BinaryExpression)leaf;
+            KeyState finale = checkPushDown(binExpr);
+            this.filterExpr = finale.filterExpr;
+            this.pushdownExpr = getExpression(finale.pushdownExpr);
+        }
+    }
+
+    /**
+     * @return new filtered plan after pushdownable filters are removed
+     */
+    public LogicalExpressionPlan getFilteredPlan() {
+        return filteredPlan;
+    }
+
+    /**
+     * @return true if pushdown is possible
+     */
+    public boolean canPushDown() {
+        return pushdownExpr != null;
+    }
+
+    /**
+     * @return the filterRemovable
+     */
+    public boolean isFilterRemovable() {
+        return filterExpr == null;
+    }
+
+    /**
+     * @return the condition on partition columns extracted from filter
+     */
+    public  Expression getPColCondition(){
+        return pushdownExpr;
+    }
+
+    private class KeyState {
+        LogicalExpression pushdownExpr;
+        LogicalExpression filterExpr;
+    }
+
+    private KeyState checkPushDown(LogicalExpression op) throws FrontendException {
+        // Note: Currently, Expression interface only understands 3 Expression Types
+        // (Look at getExpression below) BinaryExpression, ProjectExpression and ConstantExpression
+        if(op instanceof ProjectExpression) {
+            return checkPushDown((ProjectExpression)op);
+        } else if (op instanceof BinaryExpression) {
+            return checkPushDown((BinaryExpression)op);
+        } else if (op instanceof ConstantExpression) {
+            // Constants can be pushdown
+            KeyState state = new KeyState();
+            state.pushdownExpr = op;
+            state.filterExpr = null;
+            return state;
+        } else {
+            KeyState state = new KeyState();
+            state.pushdownExpr = null;
+            state.filterExpr = addToFilterPlan(op);
+            return state;
+        }
+    }
+
+    private LogicalExpression addToFilterPlan(LogicalExpression op) throws FrontendException {
+        // This copies the whole tree underneath op
+        LogicalExpression newOp = op.deepCopy(filteredPlan);
+        return newOp;
+    }
+
+    private LogicalExpression andLogicalExpressions(
+            LogicalExpressionPlan plan, LogicalExpression a, LogicalExpression b) {
+        if (a == null) {
+            return b;
+        }
+        if (b == null) {
+            return a;
+        }
+        LogicalExpression andOp = new AndExpression(plan, a, b);
+        return andOp;
+    }
+
+    private LogicalExpression orLogicalExpressions(
+            LogicalExpressionPlan plan, LogicalExpression a, LogicalExpression b) {
+        // Or 2 operators if they are not null
+        if (a == null || b == null) {
+            return null;
+        }
+        LogicalExpression orOp = new OrExpression(plan, a, b);
+        return orOp;
+    }
+
+    private KeyState checkPushDown(BinaryExpression binExpr) throws FrontendException {
+        KeyState state = new KeyState();
+        KeyState leftState = checkPushDown(binExpr.getLhs());
+        KeyState rightState = checkPushDown(binExpr.getRhs());
+
+        if (binExpr instanceof AndExpression) {
+            // AND is commutative
+            // Expression =
+            // (leftState.pushdownExpr AND leftState.filterExpr)
+            // AND (rightState.pushdownExpr AND leftState.filterExpr)
+            //
+            // pushDownExpr = (leftState.pushdownExpr AND rightState.pushdownExpr)
+            // filterExpr = (leftState.filterExpr AND rightState.filterExpr)
+            state.pushdownExpr = andLogicalExpressions(pushdownExprPlan, leftState.pushdownExpr, rightState.pushdownExpr);
+            state.filterExpr = andLogicalExpressions(filteredPlan, leftState.filterExpr, rightState.filterExpr);
+        } else if (binExpr instanceof OrExpression) {
+            // Expression =
+            // (leftState.pushdownExpr AND leftState.filterExpr)
+            // OR (rightState.pushdownExpr AND leftState.filterExpr)
+            //
+            // This could be rewritten with distributive property as
+            // (leftState.pushdownExpr OR rightState.pushdownExpr)
+            // AND
+            // ( (leftState.pushdownExpr OR rightState.filterExpr)
+            // AND (leftState.filterExpr OR rightState.pushdownExpr)
+            // AND (leftState.filterExpr OR rightState.filterExpr)
+            // )
+            // In other words,
+            // pushdownExpr = leftState.pushdownExpr OR rightState.pushdownExpr
+            // filterExpr = (leftState.pushdownExpr OR rightState.filterExpr)
+            //              AND (leftState.filterExpr OR rightState.pushdownExpr)
+            //              AND (leftState.filterExpr OR rightState.filterExpr)
+            state.pushdownExpr = orLogicalExpressions(pushdownExprPlan, leftState.pushdownExpr, rightState.pushdownExpr);
+            if(state.pushdownExpr == null) {
+                // Whatever we did so far on the right tree is all wasted :(
+                // Undo all the mutation (AND OR distributions) until now
+                removeFromFilteredPlan(leftState.filterExpr);
+                removeFromFilteredPlan(rightState.filterExpr);
+                state.filterExpr = addToFilterPlan(binExpr);
+            } else {
+                LogicalExpression f1 = orLogicalExpressions(filteredPlan, leftState.pushdownExpr, rightState.filterExpr);
+                LogicalExpression f2 = orLogicalExpressions(filteredPlan, leftState.filterExpr, rightState.pushdownExpr);
+                LogicalExpression f3 = orLogicalExpressions(filteredPlan, leftState.filterExpr, rightState.filterExpr);
+                state.filterExpr = andLogicalExpressions(filteredPlan, f1, andLogicalExpressions(filteredPlan, f2, f3));
+            }
+        } else {
+            // leftState OP rightState
+            if (leftState.filterExpr == null && rightState.filterExpr == null) {
+                state.pushdownExpr = binExpr;
+                state.filterExpr = null;
+            } else {
+                state.pushdownExpr = null;
+                removeFromFilteredPlan(leftState.filterExpr);
+                removeFromFilteredPlan(rightState.filterExpr);
+                state.filterExpr = addToFilterPlan(binExpr);
+            }
+        }
+        return state;
+    }
+
+    private KeyState checkPushDown(ProjectExpression project) throws FrontendException {
+        String fieldName = project.getFieldSchema().alias;
+        KeyState state = new KeyState();
+        if(partitionCols.contains(fieldName)) {
+            state.filterExpr = null;
+            state.pushdownExpr = project;
+        } else {
+            state.filterExpr = addToFilterPlan(project);
+            state.pushdownExpr = null;
+        }
+        return state;
+    }
+
+    /**
+     * Assume that the given operator is already disconnected from its predecessors.
+     * @param op
+     * @throws FrontendException
+     */
+    private void removeFromFilteredPlan(Operator op) throws FrontendException {
+        List<Operator> succs = filteredPlan.getSuccessors( op );
+        if( succs == null ) {
+            filteredPlan.remove( op );
+            return;
+        }
+
+        Operator[] children = new Operator[succs.size()];
+        for( int i = 0; i < succs.size(); i++ ) {
+            children[i] = succs.get(i);
+        }
+
+        for( Operator succ : children ) {
+            filteredPlan.disconnect( op, succ );
+            removeFromFilteredPlan( succ );
+        }
+
+        filteredPlan.remove( op );
+    }
+
+    public static Expression getExpression(LogicalExpression op) throws FrontendException
+    {
+        if(op == null) {
+            return null;
+        }
+        if(op instanceof ConstantExpression) {
+            ConstantExpression constExpr =(ConstantExpression)op ;
+            return new Expression.Const( constExpr.getValue() );
+        } else if (op instanceof ProjectExpression) {
+            ProjectExpression projExpr = (ProjectExpression)op;
+            String fieldName = projExpr.getFieldSchema().alias;
+            return new Expression.Column(fieldName);
+        } else {
+            if( !( op instanceof BinaryExpression ) ) {
+                LOG.error("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
+                throw new FrontendException("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
+            }
+            BinaryExpression binOp = (BinaryExpression)op;
+            if(binOp instanceof AddExpression) {
+                return getExpression( binOp, OpType.OP_PLUS );
+            } else if(binOp instanceof SubtractExpression) {
+                return getExpression(binOp, OpType.OP_MINUS);
+            } else if(binOp instanceof MultiplyExpression) {
+                return getExpression(binOp, OpType.OP_TIMES);
+            } else if(binOp instanceof DivideExpression) {
+                return getExpression(binOp, OpType.OP_DIV);
+            } else if(binOp instanceof ModExpression) {
+                return getExpression(binOp, OpType.OP_MOD);
+            } else if(binOp instanceof AndExpression) {
+                return getExpression(binOp, OpType.OP_AND);
+            } else if(binOp instanceof OrExpression) {
+                return getExpression(binOp, OpType.OP_OR);
+            } else if(binOp instanceof EqualExpression) {
+                return getExpression(binOp, OpType.OP_EQ);
+            } else if(binOp instanceof NotEqualExpression) {
+                return getExpression(binOp, OpType.OP_NE);
+            } else if(binOp instanceof GreaterThanExpression) {
+                return getExpression(binOp, OpType.OP_GT);
+            } else if(binOp instanceof GreaterThanEqualExpression) {
+                return getExpression(binOp, OpType.OP_GE);
+            } else if(binOp instanceof LessThanExpression) {
+                return getExpression(binOp, OpType.OP_LT);
+            } else if(binOp instanceof LessThanEqualExpression) {
+                return getExpression(binOp, OpType.OP_LE);
+            } else if(binOp instanceof RegexExpression) {
+                return getExpression(binOp, OpType.OP_MATCH);
+            } else {
+                LOG.error("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
+                throw new FrontendException("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
+            }
+        }
+    }
+
+    private static Expression getExpression(BinaryExpression binOp, OpType
+            opType) throws FrontendException {
+        return new Expression.BinaryExpression(getExpression(binOp.getLhs())
+                , getExpression(binOp.getRhs()), opType);
+    }
+}

Modified: pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java?rev=1525970&r1=1525969&r2=1525970&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java Tue Sep 24 18:59:53 2013
@@ -64,6 +64,7 @@ import org.apache.pig.newplan.DepthFirst
  * The condition on partition cols will be used to prune partitions of the table.
  *
  */
+@Deprecated
 public class PColFilterExtractor extends PlanVisitor {
 
     private static final Log LOG = LogFactory.getLog(PColFilterExtractor.class);

Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=1525970&r1=1525969&r2=1525970&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Tue Sep 24 18:59:53 2013
@@ -75,9 +75,8 @@ public class LogicalPlanOptimizer extend
     }
 
     protected List<Set<Rule>> buildRuleSets() {
-        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();	    
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
 
-        
         // ImplicitSplitInserter set
         // This set of rules Insert Foreach dedicated for casting after load
         Set<Rule> s = new HashSet<Rule>();
@@ -94,7 +93,18 @@ public class LogicalPlanOptimizer extend
         checkAndAddRule(s, r);
         if (!s.isEmpty())
             ls.add(s);
-        
+
+        // Partition filter set
+        // This set of rules push partition filter to LoadFunc
+        // Important: do this before LogicalExpressionSimplifier so that size of filter can be reduced
+        // (However, its not necessary to do it before LogicalExpressionSimplifier)
+        s = new HashSet<Rule>();
+        // Optimize partition filter
+        r = new PartitionFilterOptimizer("NewPartitionFilterOptimizer");
+        checkAndAddRule(s, r);
+        if (!s.isEmpty())
+            ls.add(s);
+
         // Logical expression simplifier
         s = new HashSet<Rule>();
         // add logical expression simplification rule
@@ -112,7 +122,7 @@ public class LogicalPlanOptimizer extend
         checkAndAddRule(s, r);
         if (!s.isEmpty())
             ls.add(s);
-        
+
         // Split Set
         // This set of rules does splitting of operators only.
         // It does not move operators
@@ -122,8 +132,8 @@ public class LogicalPlanOptimizer extend
         checkAndAddRule(s, r);
         if (!s.isEmpty())
             ls.add(s);
-        
-        
+
+
         // Push Set,
         // This set does moving of operators only.
         s = new HashSet<Rule>();
@@ -133,17 +143,17 @@ public class LogicalPlanOptimizer extend
         checkAndAddRule(s, r);
         if (!s.isEmpty())
             ls.add(s);
-        
+
         // Merge Set
         // This Set merges operators but does not move them.
         s = new HashSet<Rule>();
         checkAndAddRule(s, r);
         // add merge filter rule
-        r = new MergeFilter("MergeFilter");        
+        r = new MergeFilter("MergeFilter");
         checkAndAddRule(s, r);
         if (!s.isEmpty())
             ls.add(s);
-        
+
         // Partition filter set
         // This set of rules push partition filter to LoadFunc
         s = new HashSet<Rule>();
@@ -152,7 +162,7 @@ public class LogicalPlanOptimizer extend
         checkAndAddRule(s, r);
         if (!s.isEmpty())
             ls.add(s);
-        
+
         // PushDownForEachFlatten set
         s = new HashSet<Rule>();
         // Add the PushDownForEachFlatten
@@ -160,7 +170,7 @@ public class LogicalPlanOptimizer extend
         checkAndAddRule(s, r);
         if (!s.isEmpty())
             ls.add(s);
-        
+
         // Prune Set
         // This set is used for pruning columns and maps
         s = new HashSet<Rule>();
@@ -169,7 +179,7 @@ public class LogicalPlanOptimizer extend
         checkAndAddRule(s, r);
         if (!s.isEmpty())
             ls.add(s);
-        
+
         // Add LOForEach set
         s = new HashSet<Rule>();
         // Add the AddForEach
@@ -177,7 +187,7 @@ public class LogicalPlanOptimizer extend
         checkAndAddRule(s, r);
         if (!s.isEmpty())
             ls.add(s);
-        
+
         // Add MergeForEach set
         s = new HashSet<Rule>();
         // Add the AddForEach
@@ -185,14 +195,14 @@ public class LogicalPlanOptimizer extend
         checkAndAddRule(s, r);
         if (!s.isEmpty())
             ls.add(s);
-        
+
         //set parallism to 1 for cogroup/group-by on constant
         s = new HashSet<Rule>();
         r = new GroupByConstParallelSetter("GroupByConstParallelSetter");
         checkAndAddRule(s, r);
         if(!s.isEmpty())
             ls.add(s);
-        
+
         // Limit Set
         // This set of rules push up limit
         s = new HashSet<Rule>();
@@ -201,7 +211,7 @@ public class LogicalPlanOptimizer extend
         checkAndAddRule(s, r);
         if (!s.isEmpty())
             ls.add(s);
-        
+
         return ls;
     }
 

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java?rev=1525970&r1=1525969&r2=1525970&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java Tue Sep 24 18:59:53 2013
@@ -31,6 +31,7 @@ import org.apache.pig.LoadMetadata;
 import org.apache.pig.Expression.BinaryExpression;
 import org.apache.pig.Expression.Column;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.FilterExtractor;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.OperatorSubPlan;
@@ -95,11 +96,47 @@ public class PartitionFilterOptimizer ex
 
     @Override
     public Transformer getNewTransformer() {
-        return new PartitionFilterPushDownTransformer();
+        if(name.equals("PartitionFilterOptimizer")) {
+            return new PartitionFilterPushDownTransformer();
+        } else {
+            return new NewPartitionFilterPushDownTransformer();
+        }
     }
-    
+
+    public class NewPartitionFilterPushDownTransformer extends PartitionFilterPushDownTransformer {
+        @Override
+        public void transform(OperatorPlan matched) throws FrontendException {
+            subPlan = new OperatorSubPlan( currentPlan );
+
+            setupColNameMaps();
+
+            FilterExtractor filterFinder = new FilterExtractor(
+                    loFilter.getFilterPlan(), getMappedKeys( partitionKeys ) );
+            filterFinder.visit();
+            Expression partitionFilter = filterFinder.getPColCondition();
+
+            if(partitionFilter != null) {
+                // the column names in the filter may be the ones provided by
+                // the user in the schema in the load statement - we may need
+                // to replace them with partition column names as given by
+                // LoadFunc.getSchema()
+                updateMappedColNames(partitionFilter);
+                try {
+                    loadMetadata.setPartitionFilter(partitionFilter);
+                } catch (IOException e) {
+                    throw new FrontendException( e );
+                }
+                if(filterFinder.isFilterRemovable()) {
+                    currentPlan.removeAndReconnect( loFilter );
+                } else {
+                    loFilter.setFilterPlan(filterFinder.getFilteredPlan());
+                }
+            }
+        }
+    }
+
     public class PartitionFilterPushDownTransformer extends Transformer {
-        private OperatorSubPlan subPlan;
+        protected OperatorSubPlan subPlan;
 
         @Override
         public boolean check(OperatorPlan matched) throws FrontendException {
@@ -128,12 +165,9 @@ public class PartitionFilterOptimizer ex
 				throw new FrontendException( e );
 			}
             if( partitionKeys == null || partitionKeys.length == 0 ) {
-            	return false;
+                return false;
             }
             
-//            LogicalExpressionPlan filterExpr = filter.getFilterPlan();
-            
-            // we found a load-filter pattern where the load returns partition keys
             return true;
         }
 
@@ -174,12 +208,12 @@ public class PartitionFilterOptimizer ex
         		if(pColFilterFinder.isFilterRemovable()) {  
         			currentPlan.removeAndReconnect( loFilter );
         		} else {
-        		    loFilter.setFilterPlan(filterExprCopy);
-        		}
-        	}
+                    loFilter.setFilterPlan(filterExprCopy);
+                }
+            }
         }
         
-        private void updateMappedColNames(Expression expr) {
+        protected void updateMappedColNames(Expression expr) {
             if(expr instanceof BinaryExpression) {
                 updateMappedColNames(((BinaryExpression) expr).getLhs());
                 updateMappedColNames(((BinaryExpression) expr).getRhs());
@@ -198,7 +232,7 @@ public class PartitionFilterOptimizer ex
          * @param partitionKeys
          * @return
          */
-        private List<String> getMappedKeys(String[] partitionKeys) {
+        protected List<String> getMappedKeys(String[] partitionKeys) {
             List<String> mappedKeys = new ArrayList<String>(partitionKeys.length);
             for (int i = 0; i < partitionKeys.length; i++) {
                 mappedKeys.add(colNameMap.get(partitionKeys[i]));
@@ -206,11 +240,11 @@ public class PartitionFilterOptimizer ex
             return mappedKeys;
         }
 
-        private void setupColNameMaps() throws FrontendException {
+        protected void setupColNameMaps() throws FrontendException {
             LogicalSchema loLoadSchema = loLoad.getSchema();
             LogicalSchema loadFuncSchema = loLoad.getDeterminedSchema();
              for(int i = 0; i < loadFuncSchema.size(); i++) {
-                colNameMap.put(loadFuncSchema.getField(i).alias, 
+                colNameMap.put(loadFuncSchema.getField(i).alias,
                         (i < loLoadSchema.size() ? loLoadSchema.getField(i).alias :
                             loadFuncSchema.getField(i).alias));
                 

Added: pig/trunk/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java?rev=1525970&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java Tue Sep 24 18:59:53 2013
@@ -0,0 +1,883 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import junit.framework.AssertionFailedError;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.ExecType;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.PigServer;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.newplan.FilterExtractor;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.AndExpression;
+import org.apache.pig.newplan.logical.expression.BinaryExpression;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.DereferenceExpression;
+import org.apache.pig.newplan.logical.expression.EqualExpression;
+import org.apache.pig.newplan.logical.expression.IsNullExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.MapLookupExpression;
+import org.apache.pig.newplan.logical.expression.OrExpression;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
+import org.apache.pig.newplan.logical.rules.PartitionFilterOptimizer;
+import org.apache.pig.newplan.optimizer.PlanOptimizer;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.parser.ParserException;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * unit tests to test extracting new partition filter conditions out of the filter
+ * condition in the filter following a load which talks to metadata system (.i.e.
+ * implements {@link LoadMetadata})
+ */
+public class TestNewPartitionFilterPushDown {
+    static PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
+    String query = "a = load 'foo' as (srcid:int, mrkt:chararray, dstid:int, name:chararray, " +
+            "age:int, browser:map[], location:tuple(country:chararray, zip:int));";
+    String loadquery = "a = load 'foo' using "
+            + TestLoader.class.getName() +
+            "('srcid:int, mrkt:chararray, dstid:int, name:chararray, " +
+            "age:int, browser:map[], location:tuple(country:chararray, zip:int)', " +
+            "'%s');";
+    String query2 = String.format(loadquery, "srcid,dstid");
+    String query3 = String.format(loadquery, "srcid");
+
+    /**
+     * test case where there is a single expression on partition columns in
+     * the filter expression along with an expression on non partition column
+     * @throws Exception
+     */
+    @Test
+    public void testSimpleMixed() throws Exception {
+        String q = query + "b = filter a by srcid == 10 and name == 'foo';" + "store b into 'out';";
+        test(q, Arrays.asList("srcid"), "(srcid == 10)", "(name == 'foo')");
+    }
+
+    /**
+     * test case where filter does not contain any condition on partition cols
+     * @throws Exception
+     */
+    @Test
+    public void testNoPartFilter() throws Exception {
+        String q = query + "b = filter a by age == 20 and name == 'foo';" + "store b into 'out';";
+        test(q, Arrays.asList("srcid"), null,
+                "((age == 20) and (name == 'foo'))");
+    }
+
+    /**
+     * test case where filter only contains condition on partition cols
+     * @throws Exception
+     */
+    @Test
+    public void testOnlyPartFilter1() throws Exception {
+        String q = query + "b = filter a by srcid > 20 and mrkt == 'us';" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "mrkt"),
+                "((srcid > 20) and (mrkt == 'us'))", null);
+
+    }
+
+    /**
+     * test case where filter only contains condition on partition cols
+     * @throws Exception
+     */
+    @Test
+    public void testOnlyPartFilter2() throws Exception {
+        String q = query + "b = filter a by mrkt == 'us';" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "mrkt"),
+                "(mrkt == 'us')", null);
+
+    }
+
+    /**
+     * test case where filter only contains condition on partition cols
+     * @throws Exception
+     */
+    @Test
+    public void testOnlyPartFilter3() throws Exception {
+        String q = query + "b = filter a by srcid == 20 or mrkt == 'us';" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "mrkt"),
+                "((srcid == 20) or (mrkt == 'us'))", null);
+
+    }
+
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns
+     */
+    @Test
+    public void testMixed1() throws Exception {
+        String q = query + "b = filter a by " +
+                "(age < 20 and  mrkt == 'us') and (srcid == 10 and " +
+                "name == 'foo');" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "mrkt"),
+                "((mrkt == 'us') and (srcid == 10))",
+                "((age < 20) and (name == 'foo'))");
+    }
+
+
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns
+     */
+    @Test
+    public void testMixed2() throws Exception {
+        String q = query + "b = filter a by " +
+                "(age >= 20 and  mrkt == 'us') and (srcid == 10 and " +
+                "dstid == 15);" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+                "((mrkt == 'us') and ((srcid == 10) and (dstid == 15)))",
+                "(age >= 20)");
+    }
+
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns
+     */
+    @Test
+    public void testMixed3() throws Exception {
+        String q = query + "b = filter a by " +
+                "age >= 20 and  mrkt == 'us' and srcid == 10;" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+                "((mrkt == 'us') and (srcid == 10))", "(age >= 20)");
+    }
+
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns - this testcase also has a condition
+     * based on comparison of two partition columns
+     */
+    @Test
+    public void testMixed4() throws Exception {
+        String q = query + "b = filter a by " +
+                "age >= 20 and  mrkt == 'us' and name == 'foo' and " +
+                "srcid == dstid;" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+                "((mrkt == 'us') and (srcid == dstid))",
+                "((age >= 20) and (name == 'foo'))");
+    }
+
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns -
+     * This testcase has two partition col conditions  with OR +  non parition
+     * col conditions
+     */
+    @Test
+    public void testMixed5() throws Exception {
+        String q = query + "b = filter a by " +
+                "(srcid == 10 or mrkt == 'us') and name == 'foo' and " +
+                "dstid == 30;" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+                "(((srcid == 10) or (mrkt == 'us')) and (dstid == 30))",
+                "(name == 'foo')");
+    }
+
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns -
+     * This testcase has two partition col conditions  with OR +  non parition
+     * col conditions
+     */
+    @Test
+    public void testMixed6() throws Exception {
+        String q = query + "b = filter a by " +
+                "dstid == 30 and (srcid == 10 or mrkt == 'us') and name == 'foo';" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+                "((dstid == 30) and ((srcid == 10) or (mrkt == 'us')))",
+                "(name == 'foo')");
+    }
+
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns. This testcase also tests arithmetic
+     * in partition column conditions
+     */
+    @Test
+    public void testMixedArith() throws Exception {
+        String q = query + "b = filter a by " +
+                "mrkt == 'us' and srcid * 10 == 150 + 20 and age != 15;" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+                "((mrkt == 'us') and ((srcid * 10) == (150 + 20)))",
+                "(age != 15)");
+    }
+
+    /**
+     * test case where there is a single expression on partition columns in the
+     * filter expression along with an expression on non partition column of
+     * type map
+     * @throws Exception
+     */
+    @Test
+    public void testMixedNonPartitionTypeMap() throws Exception {
+        String q = query + "b = filter a by srcid == 10 and browser#'type' == 'IE';" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid"), "(srcid == 10)", "(browser#'type' == IE)", true);
+
+        q = query + "b = filter a by srcid == 10 and browser#'type' == 'IE' and " +
+                "browser#'version'#'major' == '8.0';" + "store b into 'out';";
+        test(q, Arrays.asList("srcid"), "(srcid == 10)", "((browser#'type' == IE) and " +
+                "(browser#'version'#'major' == 8.0))", true);
+
+
+    }
+
+    @Test
+    public void testMixedNonPartitionTypeMapComplex() throws Exception {
+        TestLoader.partFilter = null;
+        String q = "a = load 'foo' using "
+                + TestLoader.class.getName() +
+                "('srcid:int, mrkt:chararray, dstid:int, name:chararray, " +
+                "age:int, browser:map[], location:tuple(country:chararray, zip:int)', " +
+                "'srcid,mrkt');" +
+                "b = filter a by srcid == 10 and mrkt > '1' and mrkt < '5';" +
+                "c = filter b by browser#'type' == 'IE';" + "store c into 'out';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
+
+        Assert.assertEquals("checking partition filter:",
+                "(((srcid == 10) and (mrkt > '1')) and (mrkt < '5'))",
+                TestLoader.partFilter.toString());
+        Operator op = newLogicalPlan.getSinks().get(0);
+        LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
+
+        String actual =
+                getTestExpression((LogicalExpression) filter.getFilterPlan().
+                        getSources().get(0)).toString();
+        Assert.assertEquals("checking trimmed filter expression:",
+                "(browser#'type' == IE)", actual);
+    }
+
+    /**
+     * test case where there is a single expression on partition columns in the
+     * filter expression along with an expression on non partition column of
+     * type tuple
+     * @throws Exception
+     */
+    @Test
+    public void testMixedNonPartitionTypeTuple() throws Exception {
+        String q = query + "b = filter a by srcid == 10 and location.country == 'US';" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid"), "(srcid == 10)", "(location.$0 == US)", true);
+    }
+
+    @Test
+    public void testAndORConditionPartitionKeyCol() throws Exception {
+        // Case of AND and OR
+        String q = "b = filter a by (srcid == 10 and dstid == 5) " +
+                "or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7);" +
+                "store b into 'out';";
+        test(query + q, Arrays.asList("srcid", "dstid"),
+                "((((srcid == 10) and (dstid == 5)) " +
+                        "or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))",
+                        null);
+        // TODO fix following test after PIG-3465
+        //testFull(query2+ q, "((((srcid == 10) and (dstid == 5)) " +
+        //        "or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))", null, false);
+
+        // Additional filter on non-partition key column
+        q = "b = filter a by ((srcid == 10 and dstid == 5) " +
+                "or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7)) and mrkt == 'US';" +
+                "store b into 'out';";
+        test(query + q, Arrays.asList("srcid", "dstid"),
+                "((((srcid == 10) and (dstid == 5)) " +
+                        "or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))",
+                "(mrkt == 'US')");
+        testFull(query2+q, "((((srcid == 10) and (dstid == 5)) " +
+                "or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))",
+                "(mrkt == 'US')", false);
+
+        // Additional filter on partition key column that cannot be pushed
+        q = query +
+                "b = filter a by ((srcid == 10 and dstid == 5) " +
+                "or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7)) and srcid is null;" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid", "dstid"),
+                "((((srcid == 10) and (dstid == 5)) " +
+                        "or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))",
+                        "(srcid is null)", true);
+
+        // partition key col but null condition which should not become part of
+        // the pushed down filter
+        q = query + "b = filter a by (srcid is null and dstid == 5) " +
+                "or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7);" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid", "dstid"),
+                "(((dstid == 5) or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))",
+                "(((srcid is null) or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))", true);
+
+        // Case of AND of ORs
+        q = query +
+                "b = filter a by (mrkt == 'US' or mrkt == 'UK') and (srcid == 11 or srcid == 10);" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid", "mrkt"),
+                "(((mrkt == 'US') or (mrkt == 'UK')) and ((srcid == 11) or (srcid == 10)))", null);
+        test(q, Arrays.asList("srcid"),
+                "((srcid == 11) or (srcid == 10))", "((mrkt == 'US') or (mrkt == 'UK'))");
+    }
+
+    @Test
+    public void testAndORConditionMixedCol() throws Exception {
+        // Case of AND and OR with partition key and non-partition key columns
+        String q = "b = filter a by (srcid == 10 and dstid == 5) " +
+                "or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7) " +
+                "or (srcid == 13 and dstid == 8);" +
+                "store b into 'out';";
+        test(query + q, Arrays.asList("srcid"), "((((srcid == 10) or (srcid == 11)) or (srcid == 12)) or (srcid == 13))",
+                "(((((srcid == 10) or (srcid == 11)) or (srcid == 12)) or (dstid == 8)) " +
+                        "and ((((((srcid == 10) or (srcid == 11)) or (dstid == 7)) and " +
+                        "(((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) " +
+                        "and ((dstid == 5) or (dstid == 6)))) or " +
+                        "(srcid == 12)) and ((((srcid == 10) or (dstid == 6)) " +
+                        "and (((dstid == 5) or (srcid == 11)) and ((dstid == 5) or (dstid == 6)))) or (dstid == 7)))) or (srcid == 13)) " +
+                        "and (((((srcid == 10) or (srcid == 11)) or (dstid == 7)) and (((((srcid == 10) or (dstid == 6)) " +
+                        "and (((dstid == 5) or (srcid == 11)) and ((dstid == 5) or (dstid == 6)))) or (srcid == 12)) " +
+                "and ((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) and ((dstid == 5) or (dstid == 6)))) or (dstid == 7)))) or (dstid == 8))))");
+        //
+        //testFull(query3 + q, "((((srcid == 10) or (srcid == 11)) or (srcid == 12)) or (srcid == 13))", "", false);
+
+        // Additional filter on a partition key column
+        q = query +
+                "b = filter a by ((srcid == 10 and dstid == 5) or (srcid == 11 and dstid == 6) " +
+                "or (srcid == 12 and dstid == 7)) and mrkt == 'US';" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid", "mrkt"), "((((srcid == 10) or (srcid == 11)) or (srcid == 12)) and (mrkt == 'US'))",
+                "((((srcid == 10) or (srcid == 11)) or (dstid == 7)) and (((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) " +
+                        "and ((dstid == 5) or (dstid == 6)))) or (srcid == 12)) and ((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) " +
+                "and ((dstid == 5) or (dstid == 6)))) or (dstid == 7))))");
+
+        q = query + "b = filter a by (mrkt == 'US' or mrkt == 'UK') and " +
+                "((srcid == 10 and dstid == 5) or (srcid == 11 and dstid == 6) " +
+                "or (srcid == 12 and dstid == 7));" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid", "mrkt"), "(((mrkt == 'US') or (mrkt == 'UK')) and (((srcid == 10) or (srcid == 11)) or (srcid == 12)))",
+                "((((srcid == 10) or (srcid == 11)) or (dstid == 7)) and (((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) and ((dstid == 5) or (dstid == 6)))) or (srcid == 12)) and ((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) and ((dstid == 5) or (dstid == 6)))) or (dstid == 7))))");
+
+        // Additional filter on a non-partition key column
+        q = query +
+                "b = filter a by ((srcid == 10 and dstid == 5) or (srcid == 11 and dstid == 6) " +
+                "or (srcid == 12 and dstid == 7)) and mrkt == 'US';" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid"), "(((srcid == 10) or (srcid == 11)) or (srcid == 12))",
+                "(((((srcid == 10) or (srcid == 11)) or (dstid == 7)) " +
+                        "and (((((srcid == 10) or (dstid == 6)) and " +
+                        "(((dstid == 5) or (srcid == 11)) and ((dstid == 5) or (dstid == 6)))) or (srcid == 12)) " +
+                        "and ((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) " +
+                "and ((dstid == 5) or (dstid == 6)))) or (dstid == 7)))) and (mrkt == 'US'))");
+
+        // Case of OR and AND
+        q = query +
+                "b = filter a by (mrkt == 'US' or mrkt == 'UK') and " +
+                "(srcid == 11 or srcid == 10) and (dstid == 5 or dstid == 6);" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid"),
+                "((srcid == 11) or (srcid == 10))",
+                "(((mrkt == 'US') or (mrkt == 'UK')) and ((dstid == 5) or (dstid == 6)))");
+        test(q, Arrays.asList("mrkt"),
+                "((mrkt == 'US') or (mrkt == 'UK'))",
+                "(((srcid == 11) or (srcid == 10)) and ((dstid == 5) or (dstid == 6)))");
+    }
+
+    private LogicalPlan migrateAndOptimizePlan(String query) throws Exception {
+        PigServer pigServer = new PigServer( pc );
+        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
+        PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
+        optimizer.optimize();
+        return newLogicalPlan;
+    }
+
+    private void testFull(String q, String partFilter, String filterExpr, boolean unsupportedExpr) throws Exception {
+        TestLoader.partFilter = null;
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
+
+        if (partFilter != null) {
+            Assert.assertEquals("checking partition filter:",
+                    partFilter,
+                    TestLoader.partFilter.toString());
+        } else {
+            Assert.assertTrue(TestLoader.partFilter == null);
+        }
+
+        if (filterExpr != null) {
+            Operator op = newLogicalPlan.getSinks().get(0);
+            LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
+
+            String actual =
+                    FilterExtractor.getExpression((LogicalExpression) filter.getFilterPlan().
+                            getSources().get(0)).toString();
+            Assert.assertEquals("checking trimmed filter expression:",
+                    filterExpr, actual);
+        } else {
+            Iterator<Operator> it = newLogicalPlan.getOperators();
+            while( it.hasNext() ) {
+                Assert.assertFalse("Checking that filter has been removed since it contained" +
+                        " only conditions on partition cols:",
+                        (it.next() instanceof LOFilter));
+            }
+        }
+    }
+
+    /**
+     * Test that pig sends correct partition column names in setPartitionFilter
+     * when the user has a schema in the load statement which renames partition
+     * columns
+     * @throws Exception
+     */
+    @Test
+    public void testColNameMapping1() throws Exception {
+        String q = "a = load 'foo' using "
+                + TestLoader.class.getName() +
+                "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
+                "'srcid,mrkt') as (f1, f2, f3, f4, f5);" +
+                "b = filter a by " +
+                "(f5 >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);" +
+                "store b into 'out';";
+
+        testFull(q, "((mrkt == 'us') and (srcid == 10))", "((f5 >= 20) and (f3 == 15))", false);
+    }
+
+    /**
+     * Test that pig sends correct partition column names in setPartitionFilter
+     * when the user has a schema in the load statement which renames partition
+     * columns - in this test case there is no condition on partition columns
+     * - so setPartitionFilter() should not be called and the filter condition
+     * should remain as is.
+     * @throws Exception
+     */
+    @Test
+    public void testColNameMapping2() throws Exception {
+        String q = "a = load 'foo' using "
+                + TestLoader.class.getName() +
+                "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
+                "'srcid') as (f1, f2, f3, f4, f5);" +
+                "b = filter a by " +
+                "f5 >= 20 and f2 == 'us' and f3 == 15;" +
+                "store b into 'out';";
+
+        testFull(q, null, "(((f5 >= 20) and (f2 == 'us')) and (f3 == 15))", false);
+    }
+
+    /**
+     * Test that pig sends correct partition column names in setPartitionFilter
+     * when the user has a schema in the load statement which renames partition
+     * columns - in this test case the filter only has conditions on partition
+     * columns
+     * @throws Exception
+     */
+    @Test
+    public void testColNameMapping3() throws Exception {
+        String query = "a = load 'foo' using "
+                + TestLoader.class.getName() +
+                "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
+                "'srcid,mrkt,dstid,age') as (f1, f2, f3, f4, f5);" +
+                "b = filter a by " +
+                "(f5 >= 20 or f2 == 'us') and (f1 == 10 and f3 == 15);" +
+                "store b into 'out';";
+
+        testFull(query, "(((age >= 20) or (mrkt == 'us')) and ((srcid == 10) and " +
+                "(dstid == 15)))", null, false);
+
+    }
+
+    /**
+     * Test that pig sends correct partition column names in setPartitionFilter
+     * when the user has a schema in the load statement which renames partition
+     * columns - in this test case the schema in load statement is a prefix
+     * (with columns renamed) of the schema returned by
+     * {@link LoadMetadata#getSchema(String, Configuration)}
+     * @throws Exception
+     */
+    @Test
+    public void testColNameMapping4() throws Exception {
+        String q = "a = load 'foo' using "
+                + TestLoader.class.getName() +
+                "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
+                "'srcid,mrkt') as (f1:int, f2:chararray, f3:int, name:chararray, age:int);" +
+                "b = filter a by " +
+                "(age >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);" + "store b into 'out';";
+
+        testFull(q, "((mrkt == 'us') and (srcid == 10))", "((age >= 20) and (f3 == 15))", false);
+    }
+
+    /**
+     * Test PIG-1267
+     * @throws Exception
+     */
+    @Test
+    public void testColNameMapping5() throws Exception {
+        TestLoader.partFilter = null;
+        String q = "a = load 'foo' using "
+                + TestLoader.class.getName() +
+                "('mrkt:chararray, a1:chararray, a2:chararray, srcid:int, bcookie:chararray', " +
+                "'srcid');" +
+                "b = load 'bar' using "
+                + TestLoader.class.getName() +
+                "('dstid:int, b1:int, b2:int, srcid:int, bcookie:chararray, mrkt:chararray'," +
+                "'srcid');" +
+                "a1 = filter a by srcid == 10;" +
+                "b1 = filter b by srcid == 20;"+
+                "c = join a1 by bcookie, b1 by bcookie;" +
+                "d = foreach c generate $4 as bcookie:chararray, " +
+                "$5 as dstid:int, $0 as mrkt:chararray;" +
+                "store d into 'out';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
+
+        String partFilter = TestLoader.partFilter.toString();
+        Assert.assertTrue( "(srcid == 20)".equals( partFilter ) ||  "(srcid == 10)".equals( partFilter ) );
+
+        int counter = 0;
+        Iterator<Operator> iter = newLogicalPlan.getOperators();
+        while (iter.hasNext()) {
+            Assert.assertTrue(!(iter.next() instanceof LOFilter));
+            counter++;
+        }
+        Assert.assertEquals(counter, 5);
+    }
+
+    /**
+     * Test PIG-2778 Add matches operator to predicate pushdown
+     * @throws Exception
+     */
+    @Test
+    public void testMatchOpPushDown() throws Exception {
+        // regexp condition on a partition col
+        String q = query + "b = filter a by name matches 'foo*';" + "store b into 'out';";
+        test(q, Arrays.asList("name"), "(name matches 'foo*')", null);
+
+        // regexp condition on a non-partition col
+        q = query + "b = filter a by name matches 'foo*';" + "store b into 'out';";
+        test(q, Arrays.asList("srcid"), null, "(name matches 'foo*')");
+    }
+
+    /**
+     * Test PIG-3395 Large filter expression makes Pig hang
+     * @throws Exception
+     */
+    @Test
+    public void testLargeAndOrCondition() throws Exception {
+        String q = query + "b = filter a by " +
+                "(srcid == 1 and mrkt == '2' and dstid == 3) " +
+                "or (srcid == 4 and mrkt == '5' and dstid == 6) " +
+                "or (srcid == 7 and mrkt == '8' and dstid == 9) " +
+                "or (srcid == 10 and mrkt == '11' and dstid == 12) " +
+                "or (srcid == 13 and mrkt == '14' and dstid == 15) " +
+                "or (srcid == 16 and mrkt == '17' and dstid == 18) " +
+                "or (srcid == 19 and mrkt == '20' and dstid == 21) " +
+                "or (srcid == 22 and mrkt == '23' and dstid == 24) " +
+                "or (srcid == 25 and mrkt == '26' and dstid == 27) " +
+                "or (srcid == 28 and mrkt == '29' and dstid == 30) " +
+                "or (srcid == 31 and mrkt == '32' and dstid == 33) " +
+                "or (srcid == 34 and mrkt == '35' and dstid == 36) " +
+                "or (srcid == 37 and mrkt == '38' and dstid == 39) " +
+                "or (srcid == 40 and mrkt == '41' and dstid == 42) " +
+                "or (srcid == 43 and mrkt == '44' and dstid == 45) " +
+                "or (srcid == 46 and mrkt == '47' and dstid == 48) " +
+                "or (srcid == 49 and mrkt == '50' and dstid == 51) " +
+                "or (srcid == 52 and mrkt == '53' and dstid == 54) " +
+                "or (srcid == 55 and mrkt == '56' and dstid == 57) " +
+                "or (srcid == 58 and mrkt == '59' and dstid == 60) " +
+                "or (srcid == 61 and mrkt == '62' and dstid == 63) " +
+                "or (srcid == 64 and mrkt == '65' and dstid == 66) " +
+                "or (srcid == 67 and mrkt == '68' and dstid == 69);" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid", "mrkt", "dstid"),
+                "(((((((((((((((((((((((((srcid == 1) and (mrkt == '2')) and (dstid == 3)) " +
+                        "or (((srcid == 4) and (mrkt == '5')) and (dstid == 6))) " +
+                        "or (((srcid == 7) and (mrkt == '8')) and (dstid == 9))) " +
+                        "or (((srcid == 10) and (mrkt == '11')) and (dstid == 12))) " +
+                        "or (((srcid == 13) and (mrkt == '14')) and (dstid == 15))) " +
+                        "or (((srcid == 16) and (mrkt == '17')) and (dstid == 18))) " +
+                        "or (((srcid == 19) and (mrkt == '20')) and (dstid == 21))) " +
+                        "or (((srcid == 22) and (mrkt == '23')) and (dstid == 24))) " +
+                        "or (((srcid == 25) and (mrkt == '26')) and (dstid == 27))) " +
+                        "or (((srcid == 28) and (mrkt == '29')) and (dstid == 30))) " +
+                        "or (((srcid == 31) and (mrkt == '32')) and (dstid == 33))) " +
+                        "or (((srcid == 34) and (mrkt == '35')) and (dstid == 36))) " +
+                        "or (((srcid == 37) and (mrkt == '38')) and (dstid == 39))) " +
+                        "or (((srcid == 40) and (mrkt == '41')) and (dstid == 42))) " +
+                        "or (((srcid == 43) and (mrkt == '44')) and (dstid == 45))) " +
+                        "or (((srcid == 46) and (mrkt == '47')) and (dstid == 48))) " +
+                        "or (((srcid == 49) and (mrkt == '50')) and (dstid == 51))) " +
+                        "or (((srcid == 52) and (mrkt == '53')) and (dstid == 54))) " +
+                        "or (((srcid == 55) and (mrkt == '56')) and (dstid == 57))) " +
+                        "or (((srcid == 58) and (mrkt == '59')) and (dstid == 60))) " +
+                        "or (((srcid == 61) and (mrkt == '62')) and (dstid == 63))) " +
+                        "or (((srcid == 64) and (mrkt == '65')) and (dstid == 66))) " +
+                        "or (((srcid == 67) and (mrkt == '68')) and (dstid == 69)))",
+                        null);
+    }
+
+    // Or with non partition filter condition should not push projection
+    @Test
+    public void testOrWithNonPartitionCondition() throws Exception {
+        String q = query + "b = filter a by " +
+                "(srcid == 1 and mrkt == '2' and dstid == 3) " +
+                "or (srcid == 4 and mrkt == '5' and dstid == 6) " +
+                "or (srcid == 7 and mrkt == '8' and dstid == 9) " +
+                "or (name is null);" +
+                "store b into 'out';";
+        negativeTest(q, Arrays.asList("srcid", "mrkt", "dstid"));
+    }
+
+    //// helper methods ///////
+    private FilterExtractor test(String query, List<String> partitionCols,
+            String expPartFilterString, String expFilterString)
+                    throws Exception {
+        return test(query, partitionCols, expPartFilterString, expFilterString, false);
+    }
+
+    private FilterExtractor test(String query, List<String> partitionCols,
+            String expPartFilterString, String expFilterString, boolean unsupportedExpression)
+                    throws Exception {
+        PigServer pigServer = new PigServer( pc );
+        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
+        Operator op = newLogicalPlan.getSinks().get(0);
+        LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
+        FilterExtractor pColExtractor = new FilterExtractor(
+                filter.getFilterPlan(), partitionCols);
+        pColExtractor.visit();
+
+        if(expPartFilterString == null) {
+            Assert.assertEquals("Checking partition column filter:", null,
+                    pColExtractor.getPColCondition());
+        } else  {
+            Assert.assertEquals("Checking partition column filter:",
+                    expPartFilterString,
+                    pColExtractor.getPColCondition().toString());
+        }
+
+        if (expFilterString == null) {
+            Assert.assertTrue("Check that filter can be removed:",
+                    pColExtractor.isFilterRemovable());
+        } else {
+            if (unsupportedExpression) {
+                String actual = getTestExpression((LogicalExpression)pColExtractor.getFilteredPlan().getSources().get(0)).toString();
+                Assert.assertEquals("checking trimmed filter expression:", expFilterString, actual);
+            } else {
+                String actual = FilterExtractor.getExpression((LogicalExpression)pColExtractor.getFilteredPlan().getSources().get(0)).toString();
+                Assert.assertEquals("checking trimmed filter expression:", expFilterString, actual);
+            }
+        }
+        return pColExtractor;
+    }
+
+    // The filter cannot be pushed down unless it meets certain conditions. In
+    // that case, PColExtractor.getPColCondition() should return null.
+    private void negativeTest(String query, List<String> partitionCols) throws Exception {
+        PigServer pigServer = new PigServer( pc );
+        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
+        Operator op = newLogicalPlan.getSinks().get(0);
+        LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
+        FilterExtractor extractor = new FilterExtractor(
+                filter.getFilterPlan(), partitionCols);
+        extractor.visit();
+        Assert.assertFalse(extractor.canPushDown());
+    }
+
+    /**
+     * this loader is only used to test that parition column filters are given
+     * in the manner expected in terms of column names - hence it does not
+     * implement many of the methods and only implements required ones.
+     */
+    public static class TestLoader extends LoadFunc implements LoadMetadata {
+
+        Schema schema;
+        String[] partCols;
+        static Expression partFilter = null;
+
+        public TestLoader(String schemaString, String commaSepPartitionCols)
+                throws ParserException {
+            schema = Utils.getSchemaFromString(schemaString);
+            partCols = commaSepPartitionCols.split(",");
+        }
+
+        @Override
+        public InputFormat getInputFormat() throws IOException {
+            return null;
+        }
+
+        @Override
+        public Tuple getNext() throws IOException {
+            return null;
+        }
+
+        @Override
+        public void prepareToRead(RecordReader reader, PigSplit split)
+                throws IOException {
+        }
+
+        @Override
+        public void setLocation(String location, Job job) throws IOException {
+        }
+
+        @Override
+        public String[] getPartitionKeys(String location, Job job)
+                throws IOException {
+            return partCols;
+        }
+
+        @Override
+        public ResourceSchema getSchema(String location, Job job)
+                throws IOException {
+            return new ResourceSchema(schema);
+        }
+
+        @Override
+        public ResourceStatistics getStatistics(String location,
+                Job job) throws IOException {
+            return null;
+        }
+
+        @Override
+        public void setPartitionFilter(Expression partitionFilter)
+                throws IOException {
+            partFilter = partitionFilter;
+        }
+
+    }
+
+    public class MyPlanOptimizer extends LogicalPlanOptimizer {
+        protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
+            super( p, iterations, new HashSet<String>() );
+        }
+
+        protected List<Set<Rule>> buildRuleSets() {
+            List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+
+            Set<Rule> s = new HashSet<Rule>();
+            // add split filter rule
+            Rule r = new PartitionFilterOptimizer("NewPartitionFilterPushDown");
+            s = new HashSet<Rule>();
+            s.add(r);
+            ls.add(s);
+
+            r = new LoadTypeCastInserter( "LoadTypeCastInserter" );
+            s = new HashSet<Rule>();
+            s.add(r);
+            ls.add(s);
+
+            // Logical expression simplifier
+            // TODO enable this test after PIG-3465
+            /*
+            s = new HashSet<Rule>();
+            // add logical expression simplification rule
+            r = new LogicalExpressionSimplifier("FilterLogicExpressionSimplifier");
+            s.add(r);
+            ls.add(s);*/
+
+            return ls;
+        }
+    }
+
+    // Helper Functions
+    public LogicalPlan buildPlan(PigServer pigServer, String query) throws Exception {
+        try {
+            return Util.buildLp(pigServer, query);
+        } catch(Throwable t) {
+            throw new AssertionFailedError(t.getMessage());
+        }
+    }
+
+    private static String braketize(String input) {
+        return "(" + input + ")";
+    }
+
+    private static String getTestExpression(LogicalExpression op) throws FrontendException {
+        if(op == null) {
+            return null;
+        }
+        if(op instanceof ConstantExpression) {
+            ConstantExpression constExpr =(ConstantExpression)op ;
+            return String.valueOf(constExpr.getValue());
+        } else if (op instanceof ProjectExpression) {
+            ProjectExpression projExpr = (ProjectExpression)op;
+            String fieldName = projExpr.getFieldSchema().alias;
+            return fieldName;
+        } else {
+            if(op instanceof BinaryExpression) {
+                String lhs = getTestExpression(((BinaryExpression) op).getLhs());
+                String rhs = getTestExpression(((BinaryExpression) op).getRhs());
+                String opStr = null;
+                if(op instanceof EqualExpression) {
+                    opStr = " == ";
+                } else if (op instanceof AndExpression) {
+                    opStr = " and ";
+                } else if (op instanceof OrExpression) {
+                    opStr = " or ";
+                } else {
+                    opStr = op.getName();
+                }
+                return braketize(lhs + opStr + rhs);
+            } else if (op instanceof CastExpression) {
+                String expr = getTestExpression(((CastExpression) op).getExpression());
+                return expr;
+            } else if(op instanceof IsNullExpression) {
+                String expr = getTestExpression(((IsNullExpression) op).getExpression());
+                return braketize(expr + " is null");
+            } else if(op instanceof MapLookupExpression) {
+                String col = getTestExpression(((MapLookupExpression)op).getMap());
+                String key = ((MapLookupExpression)op).getLookupKey();
+                return col + "#'" + key + "'";
+            } else if(op instanceof DereferenceExpression) {
+                String alias = getTestExpression(((DereferenceExpression) op).getReferredExpression());
+                int colind = ((DereferenceExpression) op).getBagColumns().get(0);
+                String column = String.valueOf(colind);
+                return alias + ".$" + column;
+            } else {
+                throw new FrontendException("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
+            }
+        }
+    }
+}

Modified: pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java?rev=1525970&r1=1525969&r2=1525970&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java Tue Sep 24 18:59:53 2013
@@ -70,6 +70,7 @@ import org.junit.Test;
  * condition in the filter following a load which talks to metadata system (.i.e.
  * implements {@link LoadMetadata})
  */
+@Deprecated
 public class TestPartitionFilterPushDown {
     static PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
     String query = "a = load 'foo' as (srcid:int, mrkt:chararray, dstid:int, name:chararray, " +