You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by an...@apache.org on 2013/09/24 21:04:29 UTC
svn commit: r1525974 - in /pig/branches/branch-0.12: ./ conf/
src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/newplan/ src/org/apache/pig/newplan/logical/optimizer/
src/org/apache/pig/newplan/logical/rules/ tes...
Author: aniket486
Date: Tue Sep 24 19:04:28 2013
New Revision: 1525974
URL: http://svn.apache.org/r1525974
Log:
PIG-3461: Rewrite PartitionFilterOptimizer to make it work for all the cases (aniket486)
Added:
pig/branches/branch-0.12/src/org/apache/pig/newplan/FilterExtractor.java
pig/branches/branch-0.12/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java
Modified:
pig/branches/branch-0.12/CHANGES.txt
pig/branches/branch-0.12/conf/pig.properties
pig/branches/branch-0.12/src/org/apache/pig/PigConstants.java
pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
pig/branches/branch-0.12/src/org/apache/pig/newplan/PColFilterExtractor.java
pig/branches/branch-0.12/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
pig/branches/branch-0.12/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
pig/branches/branch-0.12/test/org/apache/pig/test/TestPartitionFilterPushDown.java
Modified: pig/branches/branch-0.12/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/CHANGES.txt?rev=1525974&r1=1525973&r2=1525974&view=diff
==============================================================================
--- pig/branches/branch-0.12/CHANGES.txt (original)
+++ pig/branches/branch-0.12/CHANGES.txt Tue Sep 24 19:04:28 2013
@@ -30,6 +30,8 @@ PIG-3174: Remove rpm and deb artifacts f
IMPROVEMENTS
+PIG-3461: Rewrite PartitionFilterOptimizer to make it work for all the cases (aniket486)
+
PIG-2417: Streaming UDFs - allow users to easily write UDFs in scripting languages with no
JVM implementation. (jeremykarn via daijy)
Modified: pig/branches/branch-0.12/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/conf/pig.properties?rev=1525974&r1=1525973&r2=1525974&view=diff
==============================================================================
--- pig/branches/branch-0.12/conf/pig.properties (original)
+++ pig/branches/branch-0.12/conf/pig.properties Tue Sep 24 19:04:28 2013
@@ -216,3 +216,7 @@ pig.location.check.strict=false
# This option is used to define whether to support recovery to handle the
# application master getting restarted.
# pig.output.committer.recovery.support=true
+
+# Set this option to true if you need to use the old partition filter optimizer.
+# Note: Old filter optimizer PColFilterOptimizer will be deprecated in the future.
+# pig.exec.useOldPartitionFilterOptimize=true
Modified: pig/branches/branch-0.12/src/org/apache/pig/PigConstants.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/PigConstants.java?rev=1525974&r1=1525973&r2=1525974&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/PigConstants.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/PigConstants.java Tue Sep 24 19:04:28 2013
@@ -46,4 +46,11 @@ public class PigConstants {
* by default, all rules are enabled.
*/
public static final String PIG_OPTIMIZER_RULES_DISABLED_KEY = "pig.optimizer.rules.disabled";
+
+ /**
+ * flag to use old PartitionFilterOptimizer in case NewPartitionFilterOptimizer is not backwards compatible
+ * (A known case is "filter a by 1 == 0").
+ */
+ public static final String PIG_EXEC_OLD_PART_FILTER_OPTIMIZER = "pig.exec.useOldPartitionFilterOptimizer";
+
}
\ No newline at end of file
Modified: pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1525974&r1=1525973&r2=1525974&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Tue Sep 24 19:04:28 2013
@@ -260,6 +260,14 @@ public abstract class HExecutionEngine i
disabledOptimizerRules = new HashSet<String>();
}
+ if( ! Boolean.valueOf(this.pigContext.getProperties().getProperty(
+ PigConstants.PIG_EXEC_OLD_PART_FILTER_OPTIMIZER, "false"))){
+ // Turn off the old partition filter optimizer
+ disabledOptimizerRules.add("PartitionFilterOptimizer");
+ } else {
+ disabledOptimizerRules.add("NewPartitionFilterOptimizer");
+ }
+
String pigOptimizerRulesDisabled = this.pigContext.getProperties()
.getProperty(PigConstants.PIG_OPTIMIZER_RULES_DISABLED_KEY);
if (pigOptimizerRulesDisabled != null) {
@@ -270,6 +278,7 @@ public abstract class HExecutionEngine i
if (pigContext.inIllustrator) {
disabledOptimizerRules.add("MergeForEach");
disabledOptimizerRules.add("PartitionFilterOptimizer");
+ disabledOptimizerRules.add("NewPartitionFilterOptimizer");
disabledOptimizerRules.add("LimitOptimizer");
disabledOptimizerRules.add("SplitFilter");
disabledOptimizerRules.add("PushUpFilter");
Added: pig/branches/branch-0.12/src/org/apache/pig/newplan/FilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/newplan/FilterExtractor.java?rev=1525974&view=auto
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/newplan/FilterExtractor.java (added)
+++ pig/branches/branch-0.12/src/org/apache/pig/newplan/FilterExtractor.java Tue Sep 24 19:04:28 2013
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.Expression;
+import org.apache.pig.Expression.OpType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.logical.expression.AddExpression;
+import org.apache.pig.newplan.logical.expression.AndExpression;
+import org.apache.pig.newplan.logical.expression.BinaryExpression;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.DivideExpression;
+import org.apache.pig.newplan.logical.expression.EqualExpression;
+import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression;
+import org.apache.pig.newplan.logical.expression.GreaterThanExpression;
+import org.apache.pig.newplan.logical.expression.LessThanEqualExpression;
+import org.apache.pig.newplan.logical.expression.LessThanExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ModExpression;
+import org.apache.pig.newplan.logical.expression.MultiplyExpression;
+import org.apache.pig.newplan.logical.expression.NotEqualExpression;
+import org.apache.pig.newplan.logical.expression.OrExpression;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.RegexExpression;
+import org.apache.pig.newplan.logical.expression.SubtractExpression;
+
+/**
+ * This is a rewrite of {@code PColFilterExtractor}
+ *
+ * We traverse the expression plan bottom up and separate it into two plans
+ * - pushdownExprPlan, plan that can be pushed down to the loader and
+ * - filterExprPlan, remaining plan that needs to be evaluated by pig
+ *
+ */
+public class FilterExtractor {
+
+ private static final Log LOG = LogFactory.getLog(FilterExtractor.class);
+
+ /**
+ * partition columns associated with the table
+ * present in the load on which the filter whose
+ * inner plan is being visited is applied
+ */
+ private List<String> partitionCols;
+
+ /**
+ * We visit this plan to create the filteredPlan
+ */
+ protected LogicalExpressionPlan originalPlan;
+
+ /**
+ * Plan that is created after all pushable filters are removed
+ */
+ protected LogicalExpressionPlan filteredPlan;
+
+ /**
+ * Plan that can be pushed down
+ */
+ protected LogicalExpressionPlan pushdownExprPlan;
+
+ /**
+ * Final filterExpr after we are done
+ */
+ private LogicalExpression filterExpr = null;
+
+ /**
+ * @{code Expression} to pushdown
+ */
+ private Expression pushdownExpr = null;
+
+ /**
+ *
+ * @param plan logical plan corresponding the filter's comparison condition
+ * @param partitionCols list of partition columns of the table which is
+ * being loaded in the LOAD statement which is input to the filter
+ */
+ public FilterExtractor(LogicalExpressionPlan plan,
+ List<String> partitionCols) {
+ this.originalPlan = plan;
+ this.partitionCols = new ArrayList<String>(partitionCols);
+ this.filteredPlan = new LogicalExpressionPlan();
+ this.pushdownExprPlan = new LogicalExpressionPlan();
+ }
+
+ public void visit() throws FrontendException {
+ // we will visit the leaf and it will recursively walk the plan
+ LogicalExpression leaf = (LogicalExpression)originalPlan.getSources().get( 0 );
+ // if the leaf is a unary operator it should be a FilterFunc in
+ // which case we don't try to extract partition filter conditions
+ if(leaf instanceof BinaryExpression) {
+ // recursively traverse the tree bottom up
+ // checkPushdown returns KeyState which is pair of LogicalExpression
+ BinaryExpression binExpr = (BinaryExpression)leaf;
+ KeyState finale = checkPushDown(binExpr);
+ this.filterExpr = finale.filterExpr;
+ this.pushdownExpr = getExpression(finale.pushdownExpr);
+ }
+ }
+
+ /**
+ * @return new filtered plan after pushdownable filters are removed
+ */
+ public LogicalExpressionPlan getFilteredPlan() {
+ return filteredPlan;
+ }
+
+ /**
+ * @return true if pushdown is possible
+ */
+ public boolean canPushDown() {
+ return pushdownExpr != null;
+ }
+
+ /**
+ * @return the filterRemovable
+ */
+ public boolean isFilterRemovable() {
+ return filterExpr == null;
+ }
+
+ /**
+ * @return the condition on partition columns extracted from filter
+ */
+ public Expression getPColCondition(){
+ return pushdownExpr;
+ }
+
+ private class KeyState {
+ LogicalExpression pushdownExpr;
+ LogicalExpression filterExpr;
+ }
+
+ private KeyState checkPushDown(LogicalExpression op) throws FrontendException {
+ // Note: Currently, Expression interface only understands 3 Expression Types
+ // (Look at getExpression below) BinaryExpression, ProjectExpression and ConstantExpression
+ if(op instanceof ProjectExpression) {
+ return checkPushDown((ProjectExpression)op);
+ } else if (op instanceof BinaryExpression) {
+ return checkPushDown((BinaryExpression)op);
+ } else if (op instanceof ConstantExpression) {
+ // Constants can be pushdown
+ KeyState state = new KeyState();
+ state.pushdownExpr = op;
+ state.filterExpr = null;
+ return state;
+ } else {
+ KeyState state = new KeyState();
+ state.pushdownExpr = null;
+ state.filterExpr = addToFilterPlan(op);
+ return state;
+ }
+ }
+
+ private LogicalExpression addToFilterPlan(LogicalExpression op) throws FrontendException {
+ // This copies the whole tree underneath op
+ LogicalExpression newOp = op.deepCopy(filteredPlan);
+ return newOp;
+ }
+
+ private LogicalExpression andLogicalExpressions(
+ LogicalExpressionPlan plan, LogicalExpression a, LogicalExpression b) {
+ if (a == null) {
+ return b;
+ }
+ if (b == null) {
+ return a;
+ }
+ LogicalExpression andOp = new AndExpression(plan, a, b);
+ return andOp;
+ }
+
+ private LogicalExpression orLogicalExpressions(
+ LogicalExpressionPlan plan, LogicalExpression a, LogicalExpression b) {
+ // Or 2 operators if they are not null
+ if (a == null || b == null) {
+ return null;
+ }
+ LogicalExpression orOp = new OrExpression(plan, a, b);
+ return orOp;
+ }
+
+ private KeyState checkPushDown(BinaryExpression binExpr) throws FrontendException {
+ KeyState state = new KeyState();
+ KeyState leftState = checkPushDown(binExpr.getLhs());
+ KeyState rightState = checkPushDown(binExpr.getRhs());
+
+ if (binExpr instanceof AndExpression) {
+ // AND is commutative
+ // Expression =
+ // (leftState.pushdownExpr AND leftState.filterExpr)
+ // AND (rightState.pushdownExpr AND leftState.filterExpr)
+ //
+ // pushDownExpr = (leftState.pushdownExpr AND rightState.pushdownExpr)
+ // filterExpr = (leftState.filterExpr AND rightState.filterExpr)
+ state.pushdownExpr = andLogicalExpressions(pushdownExprPlan, leftState.pushdownExpr, rightState.pushdownExpr);
+ state.filterExpr = andLogicalExpressions(filteredPlan, leftState.filterExpr, rightState.filterExpr);
+ } else if (binExpr instanceof OrExpression) {
+ // Expression =
+ // (leftState.pushdownExpr AND leftState.filterExpr)
+ // OR (rightState.pushdownExpr AND leftState.filterExpr)
+ //
+ // This could be rewritten with distributive property as
+ // (leftState.pushdownExpr OR rightState.pushdownExpr)
+ // AND
+ // ( (leftState.pushdownExpr OR rightState.filterExpr)
+ // AND (leftState.filterExpr OR rightState.pushdownExpr)
+ // AND (leftState.filterExpr OR rightState.filterExpr)
+ // )
+ // In other words,
+ // pushdownExpr = leftState.pushdownExpr OR rightState.pushdownExpr
+ // filterExpr = (leftState.pushdownExpr OR rightState.filterExpr)
+ // AND (leftState.filterExpr OR rightState.pushdownExpr)
+ // AND (leftState.filterExpr OR rightState.filterExpr)
+ state.pushdownExpr = orLogicalExpressions(pushdownExprPlan, leftState.pushdownExpr, rightState.pushdownExpr);
+ if(state.pushdownExpr == null) {
+ // Whatever we did so far on the right tree is all wasted :(
+ // Undo all the mutation (AND OR distributions) until now
+ removeFromFilteredPlan(leftState.filterExpr);
+ removeFromFilteredPlan(rightState.filterExpr);
+ state.filterExpr = addToFilterPlan(binExpr);
+ } else {
+ LogicalExpression f1 = orLogicalExpressions(filteredPlan, leftState.pushdownExpr, rightState.filterExpr);
+ LogicalExpression f2 = orLogicalExpressions(filteredPlan, leftState.filterExpr, rightState.pushdownExpr);
+ LogicalExpression f3 = orLogicalExpressions(filteredPlan, leftState.filterExpr, rightState.filterExpr);
+ state.filterExpr = andLogicalExpressions(filteredPlan, f1, andLogicalExpressions(filteredPlan, f2, f3));
+ }
+ } else {
+ // leftState OP rightState
+ if (leftState.filterExpr == null && rightState.filterExpr == null) {
+ state.pushdownExpr = binExpr;
+ state.filterExpr = null;
+ } else {
+ state.pushdownExpr = null;
+ removeFromFilteredPlan(leftState.filterExpr);
+ removeFromFilteredPlan(rightState.filterExpr);
+ state.filterExpr = addToFilterPlan(binExpr);
+ }
+ }
+ return state;
+ }
+
+ private KeyState checkPushDown(ProjectExpression project) throws FrontendException {
+ String fieldName = project.getFieldSchema().alias;
+ KeyState state = new KeyState();
+ if(partitionCols.contains(fieldName)) {
+ state.filterExpr = null;
+ state.pushdownExpr = project;
+ } else {
+ state.filterExpr = addToFilterPlan(project);
+ state.pushdownExpr = null;
+ }
+ return state;
+ }
+
+ /**
+ * Assume that the given operator is already disconnected from its predecessors.
+ * @param op
+ * @throws FrontendException
+ */
+ private void removeFromFilteredPlan(Operator op) throws FrontendException {
+ List<Operator> succs = filteredPlan.getSuccessors( op );
+ if( succs == null ) {
+ filteredPlan.remove( op );
+ return;
+ }
+
+ Operator[] children = new Operator[succs.size()];
+ for( int i = 0; i < succs.size(); i++ ) {
+ children[i] = succs.get(i);
+ }
+
+ for( Operator succ : children ) {
+ filteredPlan.disconnect( op, succ );
+ removeFromFilteredPlan( succ );
+ }
+
+ filteredPlan.remove( op );
+ }
+
+ public static Expression getExpression(LogicalExpression op) throws FrontendException
+ {
+ if(op == null) {
+ return null;
+ }
+ if(op instanceof ConstantExpression) {
+ ConstantExpression constExpr =(ConstantExpression)op ;
+ return new Expression.Const( constExpr.getValue() );
+ } else if (op instanceof ProjectExpression) {
+ ProjectExpression projExpr = (ProjectExpression)op;
+ String fieldName = projExpr.getFieldSchema().alias;
+ return new Expression.Column(fieldName);
+ } else {
+ if( !( op instanceof BinaryExpression ) ) {
+ LOG.error("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
+ throw new FrontendException("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
+ }
+ BinaryExpression binOp = (BinaryExpression)op;
+ if(binOp instanceof AddExpression) {
+ return getExpression( binOp, OpType.OP_PLUS );
+ } else if(binOp instanceof SubtractExpression) {
+ return getExpression(binOp, OpType.OP_MINUS);
+ } else if(binOp instanceof MultiplyExpression) {
+ return getExpression(binOp, OpType.OP_TIMES);
+ } else if(binOp instanceof DivideExpression) {
+ return getExpression(binOp, OpType.OP_DIV);
+ } else if(binOp instanceof ModExpression) {
+ return getExpression(binOp, OpType.OP_MOD);
+ } else if(binOp instanceof AndExpression) {
+ return getExpression(binOp, OpType.OP_AND);
+ } else if(binOp instanceof OrExpression) {
+ return getExpression(binOp, OpType.OP_OR);
+ } else if(binOp instanceof EqualExpression) {
+ return getExpression(binOp, OpType.OP_EQ);
+ } else if(binOp instanceof NotEqualExpression) {
+ return getExpression(binOp, OpType.OP_NE);
+ } else if(binOp instanceof GreaterThanExpression) {
+ return getExpression(binOp, OpType.OP_GT);
+ } else if(binOp instanceof GreaterThanEqualExpression) {
+ return getExpression(binOp, OpType.OP_GE);
+ } else if(binOp instanceof LessThanExpression) {
+ return getExpression(binOp, OpType.OP_LT);
+ } else if(binOp instanceof LessThanEqualExpression) {
+ return getExpression(binOp, OpType.OP_LE);
+ } else if(binOp instanceof RegexExpression) {
+ return getExpression(binOp, OpType.OP_MATCH);
+ } else {
+ LOG.error("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
+ throw new FrontendException("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
+ }
+ }
+ }
+
+ private static Expression getExpression(BinaryExpression binOp, OpType
+ opType) throws FrontendException {
+ return new Expression.BinaryExpression(getExpression(binOp.getLhs())
+ , getExpression(binOp.getRhs()), opType);
+ }
+}
Modified: pig/branches/branch-0.12/src/org/apache/pig/newplan/PColFilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/newplan/PColFilterExtractor.java?rev=1525974&r1=1525973&r2=1525974&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/newplan/PColFilterExtractor.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/newplan/PColFilterExtractor.java Tue Sep 24 19:04:28 2013
@@ -64,6 +64,7 @@ import org.apache.pig.newplan.DepthFirst
* The condition on partition cols will be used to prune partitions of the table.
*
*/
+@Deprecated
public class PColFilterExtractor extends PlanVisitor {
private static final Log LOG = LogFactory.getLog(PColFilterExtractor.class);
Modified: pig/branches/branch-0.12/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=1525974&r1=1525973&r2=1525974&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Tue Sep 24 19:04:28 2013
@@ -75,9 +75,8 @@ public class LogicalPlanOptimizer extend
}
protected List<Set<Rule>> buildRuleSets() {
- List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+ List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
-
// ImplicitSplitInserter set
// This set of rules Insert Foreach dedicated for casting after load
Set<Rule> s = new HashSet<Rule>();
@@ -94,7 +93,18 @@ public class LogicalPlanOptimizer extend
checkAndAddRule(s, r);
if (!s.isEmpty())
ls.add(s);
-
+
+ // Partition filter set
+ // This set of rules push partition filter to LoadFunc
+ // Important: do this before LogicalExpressionSimplifier so that size of filter can be reduced
+ // (However, its not necessary to do it before LogicalExpressionSimplifier)
+ s = new HashSet<Rule>();
+ // Optimize partition filter
+ r = new PartitionFilterOptimizer("NewPartitionFilterOptimizer");
+ checkAndAddRule(s, r);
+ if (!s.isEmpty())
+ ls.add(s);
+
// Logical expression simplifier
s = new HashSet<Rule>();
// add logical expression simplification rule
@@ -112,7 +122,7 @@ public class LogicalPlanOptimizer extend
checkAndAddRule(s, r);
if (!s.isEmpty())
ls.add(s);
-
+
// Split Set
// This set of rules does splitting of operators only.
// It does not move operators
@@ -122,8 +132,8 @@ public class LogicalPlanOptimizer extend
checkAndAddRule(s, r);
if (!s.isEmpty())
ls.add(s);
-
-
+
+
// Push Set,
// This set does moving of operators only.
s = new HashSet<Rule>();
@@ -133,17 +143,17 @@ public class LogicalPlanOptimizer extend
checkAndAddRule(s, r);
if (!s.isEmpty())
ls.add(s);
-
+
// Merge Set
// This Set merges operators but does not move them.
s = new HashSet<Rule>();
checkAndAddRule(s, r);
// add merge filter rule
- r = new MergeFilter("MergeFilter");
+ r = new MergeFilter("MergeFilter");
checkAndAddRule(s, r);
if (!s.isEmpty())
ls.add(s);
-
+
// Partition filter set
// This set of rules push partition filter to LoadFunc
s = new HashSet<Rule>();
@@ -152,7 +162,7 @@ public class LogicalPlanOptimizer extend
checkAndAddRule(s, r);
if (!s.isEmpty())
ls.add(s);
-
+
// PushDownForEachFlatten set
s = new HashSet<Rule>();
// Add the PushDownForEachFlatten
@@ -160,7 +170,7 @@ public class LogicalPlanOptimizer extend
checkAndAddRule(s, r);
if (!s.isEmpty())
ls.add(s);
-
+
// Prune Set
// This set is used for pruning columns and maps
s = new HashSet<Rule>();
@@ -169,7 +179,7 @@ public class LogicalPlanOptimizer extend
checkAndAddRule(s, r);
if (!s.isEmpty())
ls.add(s);
-
+
// Add LOForEach set
s = new HashSet<Rule>();
// Add the AddForEach
@@ -177,7 +187,7 @@ public class LogicalPlanOptimizer extend
checkAndAddRule(s, r);
if (!s.isEmpty())
ls.add(s);
-
+
// Add MergeForEach set
s = new HashSet<Rule>();
// Add the AddForEach
@@ -185,14 +195,14 @@ public class LogicalPlanOptimizer extend
checkAndAddRule(s, r);
if (!s.isEmpty())
ls.add(s);
-
+
//set parallism to 1 for cogroup/group-by on constant
s = new HashSet<Rule>();
r = new GroupByConstParallelSetter("GroupByConstParallelSetter");
checkAndAddRule(s, r);
if(!s.isEmpty())
ls.add(s);
-
+
// Limit Set
// This set of rules push up limit
s = new HashSet<Rule>();
@@ -201,7 +211,7 @@ public class LogicalPlanOptimizer extend
checkAndAddRule(s, r);
if (!s.isEmpty())
ls.add(s);
-
+
return ls;
}
Modified: pig/branches/branch-0.12/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java?rev=1525974&r1=1525973&r2=1525974&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java Tue Sep 24 19:04:28 2013
@@ -31,6 +31,7 @@ import org.apache.pig.LoadMetadata;
import org.apache.pig.Expression.BinaryExpression;
import org.apache.pig.Expression.Column;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.FilterExtractor;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.OperatorSubPlan;
@@ -95,11 +96,47 @@ public class PartitionFilterOptimizer ex
@Override
public Transformer getNewTransformer() {
- return new PartitionFilterPushDownTransformer();
+ if(name.equals("PartitionFilterOptimizer")) {
+ return new PartitionFilterPushDownTransformer();
+ } else {
+ return new NewPartitionFilterPushDownTransformer();
+ }
}
-
+
+ public class NewPartitionFilterPushDownTransformer extends PartitionFilterPushDownTransformer {
+ @Override
+ public void transform(OperatorPlan matched) throws FrontendException {
+ subPlan = new OperatorSubPlan( currentPlan );
+
+ setupColNameMaps();
+
+ FilterExtractor filterFinder = new FilterExtractor(
+ loFilter.getFilterPlan(), getMappedKeys( partitionKeys ) );
+ filterFinder.visit();
+ Expression partitionFilter = filterFinder.getPColCondition();
+
+ if(partitionFilter != null) {
+ // the column names in the filter may be the ones provided by
+ // the user in the schema in the load statement - we may need
+ // to replace them with partition column names as given by
+ // LoadFunc.getSchema()
+ updateMappedColNames(partitionFilter);
+ try {
+ loadMetadata.setPartitionFilter(partitionFilter);
+ } catch (IOException e) {
+ throw new FrontendException( e );
+ }
+ if(filterFinder.isFilterRemovable()) {
+ currentPlan.removeAndReconnect( loFilter );
+ } else {
+ loFilter.setFilterPlan(filterFinder.getFilteredPlan());
+ }
+ }
+ }
+ }
+
public class PartitionFilterPushDownTransformer extends Transformer {
- private OperatorSubPlan subPlan;
+ protected OperatorSubPlan subPlan;
@Override
public boolean check(OperatorPlan matched) throws FrontendException {
@@ -128,12 +165,9 @@ public class PartitionFilterOptimizer ex
throw new FrontendException( e );
}
if( partitionKeys == null || partitionKeys.length == 0 ) {
- return false;
+ return false;
}
-// LogicalExpressionPlan filterExpr = filter.getFilterPlan();
-
- // we found a load-filter pattern where the load returns partition keys
return true;
}
@@ -174,12 +208,12 @@ public class PartitionFilterOptimizer ex
if(pColFilterFinder.isFilterRemovable()) {
currentPlan.removeAndReconnect( loFilter );
} else {
- loFilter.setFilterPlan(filterExprCopy);
- }
- }
+ loFilter.setFilterPlan(filterExprCopy);
+ }
+ }
}
- private void updateMappedColNames(Expression expr) {
+ protected void updateMappedColNames(Expression expr) {
if(expr instanceof BinaryExpression) {
updateMappedColNames(((BinaryExpression) expr).getLhs());
updateMappedColNames(((BinaryExpression) expr).getRhs());
@@ -198,7 +232,7 @@ public class PartitionFilterOptimizer ex
* @param partitionKeys
* @return
*/
- private List<String> getMappedKeys(String[] partitionKeys) {
+ protected List<String> getMappedKeys(String[] partitionKeys) {
List<String> mappedKeys = new ArrayList<String>(partitionKeys.length);
for (int i = 0; i < partitionKeys.length; i++) {
mappedKeys.add(colNameMap.get(partitionKeys[i]));
@@ -206,11 +240,11 @@ public class PartitionFilterOptimizer ex
return mappedKeys;
}
- private void setupColNameMaps() throws FrontendException {
+ protected void setupColNameMaps() throws FrontendException {
LogicalSchema loLoadSchema = loLoad.getSchema();
LogicalSchema loadFuncSchema = loLoad.getDeterminedSchema();
for(int i = 0; i < loadFuncSchema.size(); i++) {
- colNameMap.put(loadFuncSchema.getField(i).alias,
+ colNameMap.put(loadFuncSchema.getField(i).alias,
(i < loLoadSchema.size() ? loLoadSchema.getField(i).alias :
loadFuncSchema.getField(i).alias));
Added: pig/branches/branch-0.12/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java?rev=1525974&view=auto
==============================================================================
--- pig/branches/branch-0.12/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java (added)
+++ pig/branches/branch-0.12/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java Tue Sep 24 19:04:28 2013
@@ -0,0 +1,883 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import junit.framework.AssertionFailedError;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.ExecType;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.PigServer;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.newplan.FilterExtractor;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.AndExpression;
+import org.apache.pig.newplan.logical.expression.BinaryExpression;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.DereferenceExpression;
+import org.apache.pig.newplan.logical.expression.EqualExpression;
+import org.apache.pig.newplan.logical.expression.IsNullExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.MapLookupExpression;
+import org.apache.pig.newplan.logical.expression.OrExpression;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
+import org.apache.pig.newplan.logical.rules.PartitionFilterOptimizer;
+import org.apache.pig.newplan.optimizer.PlanOptimizer;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.parser.ParserException;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * unit tests to test extracting new partition filter conditions out of the filter
+ * condition in the filter following a load which talks to metadata system (.i.e.
+ * implements {@link LoadMetadata})
+ */
+public class TestNewPartitionFilterPushDown {
+ static PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
+ String query = "a = load 'foo' as (srcid:int, mrkt:chararray, dstid:int, name:chararray, " +
+ "age:int, browser:map[], location:tuple(country:chararray, zip:int));";
+ String loadquery = "a = load 'foo' using "
+ + TestLoader.class.getName() +
+ "('srcid:int, mrkt:chararray, dstid:int, name:chararray, " +
+ "age:int, browser:map[], location:tuple(country:chararray, zip:int)', " +
+ "'%s');";
+ String query2 = String.format(loadquery, "srcid,dstid");
+ String query3 = String.format(loadquery, "srcid");
+
+ /**
+ * test case where there is a single expression on partition columns in
+ * the filter expression along with an expression on non partition column
+ * @throws Exception
+ */
+ @Test
+ public void testSimpleMixed() throws Exception {
+ String q = query + "b = filter a by srcid == 10 and name == 'foo';" + "store b into 'out';";
+ test(q, Arrays.asList("srcid"), "(srcid == 10)", "(name == 'foo')");
+ }
+
+ /**
+ * test case where filter does not contain any condition on partition cols
+ * @throws Exception
+ */
+ @Test
+ public void testNoPartFilter() throws Exception {
+ String q = query + "b = filter a by age == 20 and name == 'foo';" + "store b into 'out';";
+ test(q, Arrays.asList("srcid"), null,
+ "((age == 20) and (name == 'foo'))");
+ }
+
+ /**
+ * test case where filter only contains condition on partition cols
+ * @throws Exception
+ */
+ @Test
+ public void testOnlyPartFilter1() throws Exception {
+ String q = query + "b = filter a by srcid > 20 and mrkt == 'us';" + "store b into 'out';";
+ test(q, Arrays.asList("srcid", "mrkt"),
+ "((srcid > 20) and (mrkt == 'us'))", null);
+
+ }
+
+ /**
+ * test case where filter only contains condition on partition cols
+ * @throws Exception
+ */
+ @Test
+ public void testOnlyPartFilter2() throws Exception {
+ String q = query + "b = filter a by mrkt == 'us';" + "store b into 'out';";
+ test(q, Arrays.asList("srcid", "mrkt"),
+ "(mrkt == 'us')", null);
+
+ }
+
+ /**
+ * test case where filter only contains condition on partition cols
+ * @throws Exception
+ */
+ @Test
+ public void testOnlyPartFilter3() throws Exception {
+ String q = query + "b = filter a by srcid == 20 or mrkt == 'us';" + "store b into 'out';";
+ test(q, Arrays.asList("srcid", "mrkt"),
+ "((srcid == 20) or (mrkt == 'us'))", null);
+
+ }
+
+ /**
+ * test case where filter has both conditions on partition cols and non
+ * partition cols and the filter condition will be split to extract the
+ * conditions on partition columns
+ */
+ @Test
+ public void testMixed1() throws Exception {
+ String q = query + "b = filter a by " +
+ "(age < 20 and mrkt == 'us') and (srcid == 10 and " +
+ "name == 'foo');" + "store b into 'out';";
+ test(q, Arrays.asList("srcid", "mrkt"),
+ "((mrkt == 'us') and (srcid == 10))",
+ "((age < 20) and (name == 'foo'))");
+ }
+
+
+ /**
+ * test case where filter has both conditions on partition cols and non
+ * partition cols and the filter condition will be split to extract the
+ * conditions on partition columns
+ */
+ @Test
+ public void testMixed2() throws Exception {
+ String q = query + "b = filter a by " +
+ "(age >= 20 and mrkt == 'us') and (srcid == 10 and " +
+ "dstid == 15);" + "store b into 'out';";
+ test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+ "((mrkt == 'us') and ((srcid == 10) and (dstid == 15)))",
+ "(age >= 20)");
+ }
+
+ /**
+ * test case where filter has both conditions on partition cols and non
+ * partition cols and the filter condition will be split to extract the
+ * conditions on partition columns
+ */
+ @Test
+ public void testMixed3() throws Exception {
+ String q = query + "b = filter a by " +
+ "age >= 20 and mrkt == 'us' and srcid == 10;" + "store b into 'out';";
+ test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+ "((mrkt == 'us') and (srcid == 10))", "(age >= 20)");
+ }
+
+ /**
+ * test case where filter has both conditions on partition cols and non
+ * partition cols and the filter condition will be split to extract the
+ * conditions on partition columns - this testcase also has a condition
+ * based on comparison of two partition columns
+ */
+ @Test
+ public void testMixed4() throws Exception {
+ String q = query + "b = filter a by " +
+ "age >= 20 and mrkt == 'us' and name == 'foo' and " +
+ "srcid == dstid;" + "store b into 'out';";
+ test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+ "((mrkt == 'us') and (srcid == dstid))",
+ "((age >= 20) and (name == 'foo'))");
+ }
+
+ /**
+ * test case where filter has both conditions on partition cols and non
+ * partition cols and the filter condition will be split to extract the
+ * conditions on partition columns -
+ * This testcase has two partition col conditions with OR + non parition
+ * col conditions
+ */
+ @Test
+ public void testMixed5() throws Exception {
+ String q = query + "b = filter a by " +
+ "(srcid == 10 or mrkt == 'us') and name == 'foo' and " +
+ "dstid == 30;" + "store b into 'out';";
+ test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+ "(((srcid == 10) or (mrkt == 'us')) and (dstid == 30))",
+ "(name == 'foo')");
+ }
+
+ /**
+ * test case where filter has both conditions on partition cols and non
+ * partition cols and the filter condition will be split to extract the
+ * conditions on partition columns -
+ * This testcase has two partition col conditions with OR + non parition
+ * col conditions
+ */
+ @Test
+ public void testMixed6() throws Exception {
+ String q = query + "b = filter a by " +
+ "dstid == 30 and (srcid == 10 or mrkt == 'us') and name == 'foo';" + "store b into 'out';";
+ test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+ "((dstid == 30) and ((srcid == 10) or (mrkt == 'us')))",
+ "(name == 'foo')");
+ }
+
+ /**
+ * test case where filter has both conditions on partition cols and non
+ * partition cols and the filter condition will be split to extract the
+ * conditions on partition columns. This testcase also tests arithmetic
+ * in partition column conditions
+ */
+ @Test
+ public void testMixedArith() throws Exception {
+ String q = query + "b = filter a by " +
+ "mrkt == 'us' and srcid * 10 == 150 + 20 and age != 15;" + "store b into 'out';";
+ test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+ "((mrkt == 'us') and ((srcid * 10) == (150 + 20)))",
+ "(age != 15)");
+ }
+
+ /**
+ * test case where there is a single expression on partition columns in the
+ * filter expression along with an expression on non partition column of
+ * type map
+ * @throws Exception
+ */
+ @Test
+ public void testMixedNonPartitionTypeMap() throws Exception {
+ String q = query + "b = filter a by srcid == 10 and browser#'type' == 'IE';" +
+ "store b into 'out';";
+ test(q, Arrays.asList("srcid"), "(srcid == 10)", "(browser#'type' == IE)", true);
+
+ q = query + "b = filter a by srcid == 10 and browser#'type' == 'IE' and " +
+ "browser#'version'#'major' == '8.0';" + "store b into 'out';";
+ test(q, Arrays.asList("srcid"), "(srcid == 10)", "((browser#'type' == IE) and " +
+ "(browser#'version'#'major' == 8.0))", true);
+
+
+ }
+
+ @Test
+ public void testMixedNonPartitionTypeMapComplex() throws Exception {
+ TestLoader.partFilter = null;
+ String q = "a = load 'foo' using "
+ + TestLoader.class.getName() +
+ "('srcid:int, mrkt:chararray, dstid:int, name:chararray, " +
+ "age:int, browser:map[], location:tuple(country:chararray, zip:int)', " +
+ "'srcid,mrkt');" +
+ "b = filter a by srcid == 10 and mrkt > '1' and mrkt < '5';" +
+ "c = filter b by browser#'type' == 'IE';" + "store c into 'out';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
+
+ Assert.assertEquals("checking partition filter:",
+ "(((srcid == 10) and (mrkt > '1')) and (mrkt < '5'))",
+ TestLoader.partFilter.toString());
+ Operator op = newLogicalPlan.getSinks().get(0);
+ LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
+
+ String actual =
+ getTestExpression((LogicalExpression) filter.getFilterPlan().
+ getSources().get(0)).toString();
+ Assert.assertEquals("checking trimmed filter expression:",
+ "(browser#'type' == IE)", actual);
+ }
+
+ /**
+ * test case where there is a single expression on partition columns in the
+ * filter expression along with an expression on non partition column of
+ * type tuple
+ * @throws Exception
+ */
+ @Test
+ public void testMixedNonPartitionTypeTuple() throws Exception {
+ String q = query + "b = filter a by srcid == 10 and location.country == 'US';" +
+ "store b into 'out';";
+ test(q, Arrays.asList("srcid"), "(srcid == 10)", "(location.$0 == US)", true);
+ }
+
+ @Test
+ public void testAndORConditionPartitionKeyCol() throws Exception {
+ // Case of AND and OR
+ String q = "b = filter a by (srcid == 10 and dstid == 5) " +
+ "or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7);" +
+ "store b into 'out';";
+ test(query + q, Arrays.asList("srcid", "dstid"),
+ "((((srcid == 10) and (dstid == 5)) " +
+ "or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))",
+ null);
+ // TODO fix following test after PIG-3465
+ //testFull(query2+ q, "((((srcid == 10) and (dstid == 5)) " +
+ // "or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))", null, false);
+
+ // Additional filter on non-partition key column
+ q = "b = filter a by ((srcid == 10 and dstid == 5) " +
+ "or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7)) and mrkt == 'US';" +
+ "store b into 'out';";
+ test(query + q, Arrays.asList("srcid", "dstid"),
+ "((((srcid == 10) and (dstid == 5)) " +
+ "or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))",
+ "(mrkt == 'US')");
+ testFull(query2+q, "((((srcid == 10) and (dstid == 5)) " +
+ "or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))",
+ "(mrkt == 'US')", false);
+
+ // Additional filter on partition key column that cannot be pushed
+ q = query +
+ "b = filter a by ((srcid == 10 and dstid == 5) " +
+ "or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7)) and srcid is null;" +
+ "store b into 'out';";
+ test(q, Arrays.asList("srcid", "dstid"),
+ "((((srcid == 10) and (dstid == 5)) " +
+ "or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))",
+ "(srcid is null)", true);
+
+ // partition key col but null condition which should not become part of
+ // the pushed down filter
+ q = query + "b = filter a by (srcid is null and dstid == 5) " +
+ "or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7);" +
+ "store b into 'out';";
+ test(q, Arrays.asList("srcid", "dstid"),
+ "(((dstid == 5) or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))",
+ "(((srcid is null) or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))", true);
+
+ // Case of AND of ORs
+ q = query +
+ "b = filter a by (mrkt == 'US' or mrkt == 'UK') and (srcid == 11 or srcid == 10);" +
+ "store b into 'out';";
+ test(q, Arrays.asList("srcid", "mrkt"),
+ "(((mrkt == 'US') or (mrkt == 'UK')) and ((srcid == 11) or (srcid == 10)))", null);
+ test(q, Arrays.asList("srcid"),
+ "((srcid == 11) or (srcid == 10))", "((mrkt == 'US') or (mrkt == 'UK'))");
+ }
+
+ @Test
+ public void testAndORConditionMixedCol() throws Exception {
+ // Case of AND and OR with partition key and non-partition key columns
+ String q = "b = filter a by (srcid == 10 and dstid == 5) " +
+ "or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7) " +
+ "or (srcid == 13 and dstid == 8);" +
+ "store b into 'out';";
+ test(query + q, Arrays.asList("srcid"), "((((srcid == 10) or (srcid == 11)) or (srcid == 12)) or (srcid == 13))",
+ "(((((srcid == 10) or (srcid == 11)) or (srcid == 12)) or (dstid == 8)) " +
+ "and ((((((srcid == 10) or (srcid == 11)) or (dstid == 7)) and " +
+ "(((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) " +
+ "and ((dstid == 5) or (dstid == 6)))) or " +
+ "(srcid == 12)) and ((((srcid == 10) or (dstid == 6)) " +
+ "and (((dstid == 5) or (srcid == 11)) and ((dstid == 5) or (dstid == 6)))) or (dstid == 7)))) or (srcid == 13)) " +
+ "and (((((srcid == 10) or (srcid == 11)) or (dstid == 7)) and (((((srcid == 10) or (dstid == 6)) " +
+ "and (((dstid == 5) or (srcid == 11)) and ((dstid == 5) or (dstid == 6)))) or (srcid == 12)) " +
+ "and ((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) and ((dstid == 5) or (dstid == 6)))) or (dstid == 7)))) or (dstid == 8))))");
+ //
+ //testFull(query3 + q, "((((srcid == 10) or (srcid == 11)) or (srcid == 12)) or (srcid == 13))", "", false);
+
+ // Additional filter on a partition key column
+ q = query +
+ "b = filter a by ((srcid == 10 and dstid == 5) or (srcid == 11 and dstid == 6) " +
+ "or (srcid == 12 and dstid == 7)) and mrkt == 'US';" +
+ "store b into 'out';";
+ test(q, Arrays.asList("srcid", "mrkt"), "((((srcid == 10) or (srcid == 11)) or (srcid == 12)) and (mrkt == 'US'))",
+ "((((srcid == 10) or (srcid == 11)) or (dstid == 7)) and (((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) " +
+ "and ((dstid == 5) or (dstid == 6)))) or (srcid == 12)) and ((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) " +
+ "and ((dstid == 5) or (dstid == 6)))) or (dstid == 7))))");
+
+ q = query + "b = filter a by (mrkt == 'US' or mrkt == 'UK') and " +
+ "((srcid == 10 and dstid == 5) or (srcid == 11 and dstid == 6) " +
+ "or (srcid == 12 and dstid == 7));" +
+ "store b into 'out';";
+ test(q, Arrays.asList("srcid", "mrkt"), "(((mrkt == 'US') or (mrkt == 'UK')) and (((srcid == 10) or (srcid == 11)) or (srcid == 12)))",
+ "((((srcid == 10) or (srcid == 11)) or (dstid == 7)) and (((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) and ((dstid == 5) or (dstid == 6)))) or (srcid == 12)) and ((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) and ((dstid == 5) or (dstid == 6)))) or (dstid == 7))))");
+
+ // Additional filter on a non-partition key column
+ q = query +
+ "b = filter a by ((srcid == 10 and dstid == 5) or (srcid == 11 and dstid == 6) " +
+ "or (srcid == 12 and dstid == 7)) and mrkt == 'US';" +
+ "store b into 'out';";
+ test(q, Arrays.asList("srcid"), "(((srcid == 10) or (srcid == 11)) or (srcid == 12))",
+ "(((((srcid == 10) or (srcid == 11)) or (dstid == 7)) " +
+ "and (((((srcid == 10) or (dstid == 6)) and " +
+ "(((dstid == 5) or (srcid == 11)) and ((dstid == 5) or (dstid == 6)))) or (srcid == 12)) " +
+ "and ((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) " +
+ "and ((dstid == 5) or (dstid == 6)))) or (dstid == 7)))) and (mrkt == 'US'))");
+
+ // Case of OR and AND
+ q = query +
+ "b = filter a by (mrkt == 'US' or mrkt == 'UK') and " +
+ "(srcid == 11 or srcid == 10) and (dstid == 5 or dstid == 6);" +
+ "store b into 'out';";
+ test(q, Arrays.asList("srcid"),
+ "((srcid == 11) or (srcid == 10))",
+ "(((mrkt == 'US') or (mrkt == 'UK')) and ((dstid == 5) or (dstid == 6)))");
+ test(q, Arrays.asList("mrkt"),
+ "((mrkt == 'US') or (mrkt == 'UK'))",
+ "(((srcid == 11) or (srcid == 10)) and ((dstid == 5) or (dstid == 6)))");
+ }
+
+ private LogicalPlan migrateAndOptimizePlan(String query) throws Exception {
+ PigServer pigServer = new PigServer( pc );
+ LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
+ PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
+ optimizer.optimize();
+ return newLogicalPlan;
+ }
+
+ private void testFull(String q, String partFilter, String filterExpr, boolean unsupportedExpr) throws Exception {
+ TestLoader.partFilter = null;
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
+
+ if (partFilter != null) {
+ Assert.assertEquals("checking partition filter:",
+ partFilter,
+ TestLoader.partFilter.toString());
+ } else {
+ Assert.assertTrue(TestLoader.partFilter == null);
+ }
+
+ if (filterExpr != null) {
+ Operator op = newLogicalPlan.getSinks().get(0);
+ LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
+
+ String actual =
+ FilterExtractor.getExpression((LogicalExpression) filter.getFilterPlan().
+ getSources().get(0)).toString();
+ Assert.assertEquals("checking trimmed filter expression:",
+ filterExpr, actual);
+ } else {
+ Iterator<Operator> it = newLogicalPlan.getOperators();
+ while( it.hasNext() ) {
+ Assert.assertFalse("Checking that filter has been removed since it contained" +
+ " only conditions on partition cols:",
+ (it.next() instanceof LOFilter));
+ }
+ }
+ }
+
+ /**
+ * Test that pig sends correct partition column names in setPartitionFilter
+ * when the user has a schema in the load statement which renames partition
+ * columns
+ * @throws Exception
+ */
+ @Test
+ public void testColNameMapping1() throws Exception {
+ String q = "a = load 'foo' using "
+ + TestLoader.class.getName() +
+ "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
+ "'srcid,mrkt') as (f1, f2, f3, f4, f5);" +
+ "b = filter a by " +
+ "(f5 >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);" +
+ "store b into 'out';";
+
+ testFull(q, "((mrkt == 'us') and (srcid == 10))", "((f5 >= 20) and (f3 == 15))", false);
+ }
+
+ /**
+ * Test that pig sends correct partition column names in setPartitionFilter
+ * when the user has a schema in the load statement which renames partition
+ * columns - in this test case there is no condition on partition columns
+ * - so setPartitionFilter() should not be called and the filter condition
+ * should remain as is.
+ * @throws Exception
+ */
+ @Test
+ public void testColNameMapping2() throws Exception {
+ String q = "a = load 'foo' using "
+ + TestLoader.class.getName() +
+ "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
+ "'srcid') as (f1, f2, f3, f4, f5);" +
+ "b = filter a by " +
+ "f5 >= 20 and f2 == 'us' and f3 == 15;" +
+ "store b into 'out';";
+
+ testFull(q, null, "(((f5 >= 20) and (f2 == 'us')) and (f3 == 15))", false);
+ }
+
+ /**
+ * Test that pig sends correct partition column names in setPartitionFilter
+ * when the user has a schema in the load statement which renames partition
+ * columns - in this test case the filter only has conditions on partition
+ * columns
+ * @throws Exception
+ */
+ @Test
+ public void testColNameMapping3() throws Exception {
+ String query = "a = load 'foo' using "
+ + TestLoader.class.getName() +
+ "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
+ "'srcid,mrkt,dstid,age') as (f1, f2, f3, f4, f5);" +
+ "b = filter a by " +
+ "(f5 >= 20 or f2 == 'us') and (f1 == 10 and f3 == 15);" +
+ "store b into 'out';";
+
+ testFull(query, "(((age >= 20) or (mrkt == 'us')) and ((srcid == 10) and " +
+ "(dstid == 15)))", null, false);
+
+ }
+
+ /**
+ * Test that pig sends correct partition column names in setPartitionFilter
+ * when the user has a schema in the load statement which renames partition
+ * columns - in this test case the schema in load statement is a prefix
+ * (with columns renamed) of the schema returned by
+ * {@link LoadMetadata#getSchema(String, Configuration)}
+ * @throws Exception
+ */
+ @Test
+ public void testColNameMapping4() throws Exception {
+ String q = "a = load 'foo' using "
+ + TestLoader.class.getName() +
+ "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
+ "'srcid,mrkt') as (f1:int, f2:chararray, f3:int, name:chararray, age:int);" +
+ "b = filter a by " +
+ "(age >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);" + "store b into 'out';";
+
+ testFull(q, "((mrkt == 'us') and (srcid == 10))", "((age >= 20) and (f3 == 15))", false);
+ }
+
+ /**
+ * Test PIG-1267
+ * @throws Exception
+ */
+ @Test
+ public void testColNameMapping5() throws Exception {
+ TestLoader.partFilter = null;
+ String q = "a = load 'foo' using "
+ + TestLoader.class.getName() +
+ "('mrkt:chararray, a1:chararray, a2:chararray, srcid:int, bcookie:chararray', " +
+ "'srcid');" +
+ "b = load 'bar' using "
+ + TestLoader.class.getName() +
+ "('dstid:int, b1:int, b2:int, srcid:int, bcookie:chararray, mrkt:chararray'," +
+ "'srcid');" +
+ "a1 = filter a by srcid == 10;" +
+ "b1 = filter b by srcid == 20;"+
+ "c = join a1 by bcookie, b1 by bcookie;" +
+ "d = foreach c generate $4 as bcookie:chararray, " +
+ "$5 as dstid:int, $0 as mrkt:chararray;" +
+ "store d into 'out';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
+
+ String partFilter = TestLoader.partFilter.toString();
+ Assert.assertTrue( "(srcid == 20)".equals( partFilter ) || "(srcid == 10)".equals( partFilter ) );
+
+ int counter = 0;
+ Iterator<Operator> iter = newLogicalPlan.getOperators();
+ while (iter.hasNext()) {
+ Assert.assertTrue(!(iter.next() instanceof LOFilter));
+ counter++;
+ }
+ Assert.assertEquals(counter, 5);
+ }
+
+ /**
+ * Test PIG-2778 Add matches operator to predicate pushdown
+ * @throws Exception
+ */
+ @Test
+ public void testMatchOpPushDown() throws Exception {
+ // regexp condition on a partition col
+ String q = query + "b = filter a by name matches 'foo*';" + "store b into 'out';";
+ test(q, Arrays.asList("name"), "(name matches 'foo*')", null);
+
+ // regexp condition on a non-partition col
+ q = query + "b = filter a by name matches 'foo*';" + "store b into 'out';";
+ test(q, Arrays.asList("srcid"), null, "(name matches 'foo*')");
+ }
+
+ /**
+ * Test PIG-3395 Large filter expression makes Pig hang
+ * @throws Exception
+ */
+ @Test
+ public void testLargeAndOrCondition() throws Exception {
+ String q = query + "b = filter a by " +
+ "(srcid == 1 and mrkt == '2' and dstid == 3) " +
+ "or (srcid == 4 and mrkt == '5' and dstid == 6) " +
+ "or (srcid == 7 and mrkt == '8' and dstid == 9) " +
+ "or (srcid == 10 and mrkt == '11' and dstid == 12) " +
+ "or (srcid == 13 and mrkt == '14' and dstid == 15) " +
+ "or (srcid == 16 and mrkt == '17' and dstid == 18) " +
+ "or (srcid == 19 and mrkt == '20' and dstid == 21) " +
+ "or (srcid == 22 and mrkt == '23' and dstid == 24) " +
+ "or (srcid == 25 and mrkt == '26' and dstid == 27) " +
+ "or (srcid == 28 and mrkt == '29' and dstid == 30) " +
+ "or (srcid == 31 and mrkt == '32' and dstid == 33) " +
+ "or (srcid == 34 and mrkt == '35' and dstid == 36) " +
+ "or (srcid == 37 and mrkt == '38' and dstid == 39) " +
+ "or (srcid == 40 and mrkt == '41' and dstid == 42) " +
+ "or (srcid == 43 and mrkt == '44' and dstid == 45) " +
+ "or (srcid == 46 and mrkt == '47' and dstid == 48) " +
+ "or (srcid == 49 and mrkt == '50' and dstid == 51) " +
+ "or (srcid == 52 and mrkt == '53' and dstid == 54) " +
+ "or (srcid == 55 and mrkt == '56' and dstid == 57) " +
+ "or (srcid == 58 and mrkt == '59' and dstid == 60) " +
+ "or (srcid == 61 and mrkt == '62' and dstid == 63) " +
+ "or (srcid == 64 and mrkt == '65' and dstid == 66) " +
+ "or (srcid == 67 and mrkt == '68' and dstid == 69);" +
+ "store b into 'out';";
+ test(q, Arrays.asList("srcid", "mrkt", "dstid"),
+ "(((((((((((((((((((((((((srcid == 1) and (mrkt == '2')) and (dstid == 3)) " +
+ "or (((srcid == 4) and (mrkt == '5')) and (dstid == 6))) " +
+ "or (((srcid == 7) and (mrkt == '8')) and (dstid == 9))) " +
+ "or (((srcid == 10) and (mrkt == '11')) and (dstid == 12))) " +
+ "or (((srcid == 13) and (mrkt == '14')) and (dstid == 15))) " +
+ "or (((srcid == 16) and (mrkt == '17')) and (dstid == 18))) " +
+ "or (((srcid == 19) and (mrkt == '20')) and (dstid == 21))) " +
+ "or (((srcid == 22) and (mrkt == '23')) and (dstid == 24))) " +
+ "or (((srcid == 25) and (mrkt == '26')) and (dstid == 27))) " +
+ "or (((srcid == 28) and (mrkt == '29')) and (dstid == 30))) " +
+ "or (((srcid == 31) and (mrkt == '32')) and (dstid == 33))) " +
+ "or (((srcid == 34) and (mrkt == '35')) and (dstid == 36))) " +
+ "or (((srcid == 37) and (mrkt == '38')) and (dstid == 39))) " +
+ "or (((srcid == 40) and (mrkt == '41')) and (dstid == 42))) " +
+ "or (((srcid == 43) and (mrkt == '44')) and (dstid == 45))) " +
+ "or (((srcid == 46) and (mrkt == '47')) and (dstid == 48))) " +
+ "or (((srcid == 49) and (mrkt == '50')) and (dstid == 51))) " +
+ "or (((srcid == 52) and (mrkt == '53')) and (dstid == 54))) " +
+ "or (((srcid == 55) and (mrkt == '56')) and (dstid == 57))) " +
+ "or (((srcid == 58) and (mrkt == '59')) and (dstid == 60))) " +
+ "or (((srcid == 61) and (mrkt == '62')) and (dstid == 63))) " +
+ "or (((srcid == 64) and (mrkt == '65')) and (dstid == 66))) " +
+ "or (((srcid == 67) and (mrkt == '68')) and (dstid == 69)))",
+ null);
+ }
+
+ // Or with non partition filter condition should not push projection
+ @Test
+ public void testOrWithNonPartitionCondition() throws Exception {
+ String q = query + "b = filter a by " +
+ "(srcid == 1 and mrkt == '2' and dstid == 3) " +
+ "or (srcid == 4 and mrkt == '5' and dstid == 6) " +
+ "or (srcid == 7 and mrkt == '8' and dstid == 9) " +
+ "or (name is null);" +
+ "store b into 'out';";
+ negativeTest(q, Arrays.asList("srcid", "mrkt", "dstid"));
+ }
+
+ //// helper methods ///////
+ private FilterExtractor test(String query, List<String> partitionCols,
+ String expPartFilterString, String expFilterString)
+ throws Exception {
+ return test(query, partitionCols, expPartFilterString, expFilterString, false);
+ }
+
+ private FilterExtractor test(String query, List<String> partitionCols,
+ String expPartFilterString, String expFilterString, boolean unsupportedExpression)
+ throws Exception {
+ PigServer pigServer = new PigServer( pc );
+ LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
+ Operator op = newLogicalPlan.getSinks().get(0);
+ LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
+ FilterExtractor pColExtractor = new FilterExtractor(
+ filter.getFilterPlan(), partitionCols);
+ pColExtractor.visit();
+
+ if(expPartFilterString == null) {
+ Assert.assertEquals("Checking partition column filter:", null,
+ pColExtractor.getPColCondition());
+ } else {
+ Assert.assertEquals("Checking partition column filter:",
+ expPartFilterString,
+ pColExtractor.getPColCondition().toString());
+ }
+
+ if (expFilterString == null) {
+ Assert.assertTrue("Check that filter can be removed:",
+ pColExtractor.isFilterRemovable());
+ } else {
+ if (unsupportedExpression) {
+ String actual = getTestExpression((LogicalExpression)pColExtractor.getFilteredPlan().getSources().get(0)).toString();
+ Assert.assertEquals("checking trimmed filter expression:", expFilterString, actual);
+ } else {
+ String actual = FilterExtractor.getExpression((LogicalExpression)pColExtractor.getFilteredPlan().getSources().get(0)).toString();
+ Assert.assertEquals("checking trimmed filter expression:", expFilterString, actual);
+ }
+ }
+ return pColExtractor;
+ }
+
+ // The filter cannot be pushed down unless it meets certain conditions. In
+ // that case, PColExtractor.getPColCondition() should return null.
+ private void negativeTest(String query, List<String> partitionCols) throws Exception {
+ PigServer pigServer = new PigServer( pc );
+ LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
+ Operator op = newLogicalPlan.getSinks().get(0);
+ LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
+ FilterExtractor extractor = new FilterExtractor(
+ filter.getFilterPlan(), partitionCols);
+ extractor.visit();
+ Assert.assertFalse(extractor.canPushDown());
+ }
+
+ /**
+ * this loader is only used to test that parition column filters are given
+ * in the manner expected in terms of column names - hence it does not
+ * implement many of the methods and only implements required ones.
+ */
+ public static class TestLoader extends LoadFunc implements LoadMetadata {
+
+ Schema schema;
+ String[] partCols;
+ static Expression partFilter = null;
+
+ public TestLoader(String schemaString, String commaSepPartitionCols)
+ throws ParserException {
+ schema = Utils.getSchemaFromString(schemaString);
+ partCols = commaSepPartitionCols.split(",");
+ }
+
+ @Override
+ public InputFormat getInputFormat() throws IOException {
+ return null;
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ return null;
+ }
+
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit split)
+ throws IOException {
+ }
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ }
+
+ @Override
+ public String[] getPartitionKeys(String location, Job job)
+ throws IOException {
+ return partCols;
+ }
+
+ @Override
+ public ResourceSchema getSchema(String location, Job job)
+ throws IOException {
+ return new ResourceSchema(schema);
+ }
+
+ @Override
+ public ResourceStatistics getStatistics(String location,
+ Job job) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void setPartitionFilter(Expression partitionFilter)
+ throws IOException {
+ partFilter = partitionFilter;
+ }
+
+ }
+
+ public class MyPlanOptimizer extends LogicalPlanOptimizer {
+ protected MyPlanOptimizer(OperatorPlan p, int iterations) {
+ super( p, iterations, new HashSet<String>() );
+ }
+
+ protected List<Set<Rule>> buildRuleSets() {
+ List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+
+ Set<Rule> s = new HashSet<Rule>();
+ // add split filter rule
+ Rule r = new PartitionFilterOptimizer("NewPartitionFilterPushDown");
+ s = new HashSet<Rule>();
+ s.add(r);
+ ls.add(s);
+
+ r = new LoadTypeCastInserter( "LoadTypeCastInserter" );
+ s = new HashSet<Rule>();
+ s.add(r);
+ ls.add(s);
+
+ // Logical expression simplifier
+ // TODO enable this test after PIG-3465
+ /*
+ s = new HashSet<Rule>();
+ // add logical expression simplification rule
+ r = new LogicalExpressionSimplifier("FilterLogicExpressionSimplifier");
+ s.add(r);
+ ls.add(s);*/
+
+ return ls;
+ }
+ }
+
+ // Helper Functions
+ public LogicalPlan buildPlan(PigServer pigServer, String query) throws Exception {
+ try {
+ return Util.buildLp(pigServer, query);
+ } catch(Throwable t) {
+ throw new AssertionFailedError(t.getMessage());
+ }
+ }
+
+ private static String braketize(String input) {
+ return "(" + input + ")";
+ }
+
+ private static String getTestExpression(LogicalExpression op) throws FrontendException {
+ if(op == null) {
+ return null;
+ }
+ if(op instanceof ConstantExpression) {
+ ConstantExpression constExpr =(ConstantExpression)op ;
+ return String.valueOf(constExpr.getValue());
+ } else if (op instanceof ProjectExpression) {
+ ProjectExpression projExpr = (ProjectExpression)op;
+ String fieldName = projExpr.getFieldSchema().alias;
+ return fieldName;
+ } else {
+ if(op instanceof BinaryExpression) {
+ String lhs = getTestExpression(((BinaryExpression) op).getLhs());
+ String rhs = getTestExpression(((BinaryExpression) op).getRhs());
+ String opStr = null;
+ if(op instanceof EqualExpression) {
+ opStr = " == ";
+ } else if (op instanceof AndExpression) {
+ opStr = " and ";
+ } else if (op instanceof OrExpression) {
+ opStr = " or ";
+ } else {
+ opStr = op.getName();
+ }
+ return braketize(lhs + opStr + rhs);
+ } else if (op instanceof CastExpression) {
+ String expr = getTestExpression(((CastExpression) op).getExpression());
+ return expr;
+ } else if(op instanceof IsNullExpression) {
+ String expr = getTestExpression(((IsNullExpression) op).getExpression());
+ return braketize(expr + " is null");
+ } else if(op instanceof MapLookupExpression) {
+ String col = getTestExpression(((MapLookupExpression)op).getMap());
+ String key = ((MapLookupExpression)op).getLookupKey();
+ return col + "#'" + key + "'";
+ } else if(op instanceof DereferenceExpression) {
+ String alias = getTestExpression(((DereferenceExpression) op).getReferredExpression());
+ int colind = ((DereferenceExpression) op).getBagColumns().get(0);
+ String column = String.valueOf(colind);
+ return alias + ".$" + column;
+ } else {
+ throw new FrontendException("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
+ }
+ }
+ }
+}
Modified: pig/branches/branch-0.12/test/org/apache/pig/test/TestPartitionFilterPushDown.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/test/org/apache/pig/test/TestPartitionFilterPushDown.java?rev=1525974&r1=1525973&r2=1525974&view=diff
==============================================================================
--- pig/branches/branch-0.12/test/org/apache/pig/test/TestPartitionFilterPushDown.java (original)
+++ pig/branches/branch-0.12/test/org/apache/pig/test/TestPartitionFilterPushDown.java Tue Sep 24 19:04:28 2013
@@ -70,6 +70,7 @@ import org.junit.Test;
* condition in the filter following a load which talks to metadata system (.i.e.
* implements {@link LoadMetadata})
*/
+@Deprecated
public class TestPartitionFilterPushDown {
static PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
String query = "a = load 'foo' as (srcid:int, mrkt:chararray, dstid:int, name:chararray, " +