You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/10/19 01:38:51 UTC
svn commit: r1399925 - in /pig/trunk: src/org/apache/pig/Expression.java
src/org/apache/pig/newplan/PColFilterExtractor.java
test/org/apache/pig/test/TestPartitionFilterPushDown.java
Author: jcoveney
Date: Thu Oct 18 23:38:50 2012
New Revision: 1399925
URL: http://svn.apache.org/viewvc?rev=1399925&view=rev
Log:
Properly commit PIG-2778
Modified:
pig/trunk/src/org/apache/pig/Expression.java
pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java
pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java
Modified: pig/trunk/src/org/apache/pig/Expression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Expression.java?rev=1399925&r1=1399924&r2=1399925&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Expression.java (original)
+++ pig/trunk/src/org/apache/pig/Expression.java Thu Oct 18 23:38:50 2012
@@ -28,16 +28,16 @@ import org.apache.pig.classification.Int
@InterfaceStability.Evolving
public abstract class Expression {
- // Operator type
+ // Operator type
public static enum OpType {
-
+
// binary arith ops
OP_PLUS (" + "),
OP_MINUS(" - "),
OP_TIMES(" * "),
OP_DIV(" / "),
OP_MOD(" % "),
-
+
//binary ops
OP_EQ(" == "),
OP_NE(" != "),
@@ -45,29 +45,30 @@ public abstract class Expression {
OP_GE(" >= "),
OP_LT(" < "),
OP_LE(" <= "),
+ OP_MATCH(" matches "),
//binary logical
OP_AND(" and "),
OP_OR(" or "),
TERM_COL(" Column "),
TERM_CONST(" Constant ");
-
+
private String str = "";
private OpType(String rep){
this.str = rep;
}
private OpType(){
}
-
+
@Override
public String toString(){
return this.str;
}
-
+
}
-
+
protected OpType opType;
-
+
/**
* @return the opType
*/
@@ -75,22 +76,22 @@ public abstract class Expression {
return opType;
}
-
-
-
+
+
+
public static class BinaryExpression extends Expression {
-
+
/**
* left hand operand
*/
Expression lhs;
-
+
/**
* right hand operand
*/
Expression rhs;
-
-
+
+
/**
* @param lhs
* @param rhs
@@ -100,35 +101,35 @@ public abstract class Expression {
this.lhs = lhs;
this.rhs = rhs;
}
-
+
/**
* @return the left hand operand
*/
public Expression getLhs() {
return lhs;
}
-
+
/**
* @return the right hand operand
*/
public Expression getRhs() {
return rhs;
}
-
+
@Override
public String toString() {
return "(" + lhs.toString() + opType.toString() + rhs.toString()
+ ")";
}
}
-
+
public static class Column extends Expression {
-
+
/**
* name of column
*/
private String name;
-
+
/**
* @param name
*/
@@ -136,7 +137,7 @@ public abstract class Expression {
this.opType = OpType.TERM_COL;
this.name = name;
}
-
+
@Override
public String toString() {
return name;
@@ -156,21 +157,21 @@ public abstract class Expression {
this.name = name;
}
}
-
+
public static class Const extends Expression {
-
+
/**
* value of the constant
*/
Object value;
-
+
/**
* @return the value
*/
public Object getValue() {
return value;
}
-
+
/**
* @param value
*/
@@ -178,10 +179,10 @@ public abstract class Expression {
this.opType = OpType.TERM_CONST;
this.value = value;
}
-
+
@Override
public String toString() {
- return (value instanceof String) ? "\'" + value + "\'":
+ return (value instanceof String) ? "\'" + value + "\'":
value.toString();
}
}
Modified: pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java?rev=1399925&r1=1399924&r2=1399925&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java Thu Oct 18 23:38:50 2012
@@ -56,17 +56,17 @@ import org.apache.pig.newplan.logical.ex
import org.apache.pig.newplan.DepthFirstWalker;
/**
- * This Visitor works on the filter condition of a LOFilter which immediately
- * follows a LOLoad that interacts with a metadata system (currently OWL) to
+ * This Visitor works on the filter condition of a LOFilter which immediately
+ * follows a LOLoad that interacts with a metadata system (currently OWL) to
* read table data. The visitor looks for conditions on partition columns in the
* filter condition and extracts those conditions out of the filter condition.
* The condition on partition cols will be used to prune partitions of the table.
*
*/
public class PColFilterExtractor extends PlanVisitor {
-
+
private static final Log LOG = LogFactory.getLog(PColFilterExtractor.class);
-
+
/**
* partition columns associated with the table
* present in the load on which the filter whose
@@ -93,14 +93,14 @@ public class PColFilterExtractor extends
private Side replaceSide = Side.NONE;
private boolean filterRemovable = false;
-
+
private boolean canPushDown = true;
-
+
@Override
public void visit() throws FrontendException {
// we will visit the leaf and it will recursively walk the plan
LogicalExpression leaf = (LogicalExpression)plan.getSources().get( 0 );
- // if the leaf is a unary operator it should be a FilterFunc in
+ // if the leaf is a unary operator it should be a FilterFunc in
// which case we don't try to extract partition filter conditions
if(leaf instanceof BinaryExpression) {
BinaryExpression binExpr = (BinaryExpression)leaf;
@@ -119,7 +119,7 @@ public class PColFilterExtractor extends
}
/**
- *
+ *
* @param plan logical plan corresponding the filter's comparison condition
* @param partitionCols list of partition columns of the table which is
* being loaded in the LOAD statement which is input to the filter
@@ -138,23 +138,14 @@ public class PColFilterExtractor extends
if(partitionCols.contains(fieldName)) {
sawKey = true;
// The condition on partition column will be used to prune the
- // scan and removed from the filter condition. Hence the condition
+ // scan and removed from the filter condition. Hence the condition
// on the partition column will not be re applied when data is read,
// so the following cases should throw error until that changes.
List<Class<?>> opsToCheckFor = new ArrayList<Class<?>>();
- opsToCheckFor.add(RegexExpression.class);
+ opsToCheckFor.add(UserFuncExpression.class);
if(checkSuccessors(project, opsToCheckFor)) {
LOG.warn("No partition filter push down: " +
- "You have an partition column ("
- + fieldName + ") inside a regexp operator in the " +
- "filter condition.");
- canPushDown = false;
- return;
- }
- opsToCheckFor.set(0, UserFuncExpression.class);
- if(checkSuccessors(project, opsToCheckFor)) {
- LOG.warn("No partition filter push down: " +
- "You have an partition column ("
+ "You have an partition column ("
+ fieldName + ") inside a function in the " +
"filter condition.");
canPushDown = false;
@@ -163,7 +154,7 @@ public class PColFilterExtractor extends
opsToCheckFor.set(0, CastExpression.class);
if(checkSuccessors(project, opsToCheckFor)) {
LOG.warn("No partition filter push down: " +
- "You have an partition column ("
+ "You have an partition column ("
+ fieldName + ") inside a cast in the " +
"filter condition.");
canPushDown = false;
@@ -172,7 +163,7 @@ public class PColFilterExtractor extends
opsToCheckFor.set(0, IsNullExpression.class);
if(checkSuccessors(project, opsToCheckFor)) {
LOG.warn("No partition filter push down: " +
- "You have an partition column ("
+ "You have an partition column ("
+ fieldName + ") inside a null check operator in the " +
"filter condition.");
canPushDown = false;
@@ -181,7 +172,7 @@ public class PColFilterExtractor extends
opsToCheckFor.set(0, BinCondExpression.class);
if(checkSuccessors(project, opsToCheckFor)) {
LOG.warn("No partition filter push down: " +
- "You have an partition column ("
+ "You have an partition column ("
+ fieldName + ") inside a bincond operator in the " +
"filter condition.");
canPushDown = false;
@@ -204,10 +195,10 @@ public class PColFilterExtractor extends
}
private void visit(BinaryExpression binOp) throws FrontendException {
- boolean lhsSawKey = false;
- boolean rhsSawKey = false;
- boolean lhsSawNonKeyCol = false;
- boolean rhsSawNonKeyCol = false;
+ boolean lhsSawKey = false;
+ boolean rhsSawKey = false;
+ boolean lhsSawNonKeyCol = false;
+ boolean rhsSawNonKeyCol = false;
sawKey = false;
sawNonKeyCol = false;
@@ -223,12 +214,12 @@ public class PColFilterExtractor extends
rhsSawKey = sawKey;
rhsSawNonKeyCol = sawNonKeyCol;
- // only in the case of an AND, we potentially split the AND to
- // remove conditions on partition columns out of the AND. For this
+ // only in the case of an AND, we potentially split the AND to
+ // remove conditions on partition columns out of the AND. For this
// we set replaceSide accordingly so that when we reach a predecessor
- // we can trim the appropriate side. If both sides of the AND have
- // conditions on partition columns, we will remove the AND completely -
- // in this case, we will not set replaceSide, but sawKey will be
+ // we can trim the appropriate side. If both sides of the AND have
+ // conditions on partition columns, we will remove the AND completely -
+ // in this case, we will not set replaceSide, but sawKey will be
// true so that as we go to higher predecessor ANDs we can trim later.
if(binOp instanceof AndExpression) {
if(lhsSawKey && rhsSawNonKeyCol){
@@ -277,13 +268,13 @@ public class PColFilterExtractor extends
/**
* check for the presence of a certain operator type in the Successors
* @param opToStartFrom
- * @param opsToCheckFor operators to be checked for at each level of
- * Successors - the ordering in the list is the order in which the ops
+ * @param opsToCheckFor operators to be checked for at each level of
+ * Successors - the ordering in the list is the order in which the ops
* will be checked.
* @return true if opsToCheckFor are found
- * @throws IOException
+ * @throws IOException
*/
- private boolean checkSuccessors(Operator opToStartFrom,
+ private boolean checkSuccessors(Operator opToStartFrom,
List<Class<?>> opsToCheckFor) throws FrontendException {
boolean done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
if(!done && !opsToCheckFor.isEmpty()) {
@@ -296,7 +287,7 @@ public class PColFilterExtractor extends
return opsToCheckFor.isEmpty();
}
- private boolean checkSuccessorsHelper(Operator opToStartFrom,
+ private boolean checkSuccessorsHelper(Operator opToStartFrom,
List<Class<?>> opsToCheckFor) throws FrontendException {
List<Operator> successors = plan.getPredecessors(
opToStartFrom);
@@ -329,12 +320,12 @@ public class PColFilterExtractor extends
// eg if replaceSide == Side.LEFT
// binexpop
- // / \ \
+ // / \ \
// child (this is the childExpr argument send in)
// / \
- // Lt Rt
+ // Lt Rt
//
- // gets converted to
+ // gets converted to
// binexpop
// /
// Rt
@@ -344,10 +335,10 @@ public class PColFilterExtractor extends
return;
}
// child's lhs operand
- LogicalExpression leftChild =
+ LogicalExpression leftChild =
((BinaryExpression)childExpr).getLhs();
// child's rhs operand
- LogicalExpression rightChild =
+ LogicalExpression rightChild =
((BinaryExpression)childExpr).getRhs();
plan.disconnect( childExpr, leftChild );
@@ -365,12 +356,12 @@ public class PColFilterExtractor extends
logInternalErrorAndSetFlag();
return;
}
- //reset
+ //reset
replaceSide = Side.NONE;
sawKey = false;
}
-
+
private void replace(Operator oldOp, Operator newOp) throws FrontendException {
List<Operator> grandParents = plan.getPredecessors( oldOp );
if( grandParents == null || grandParents.size() == 0 ) {
@@ -386,19 +377,19 @@ public class PColFilterExtractor extends
/**
* @param op
- * @throws IOException
- * @throws IOException
- * @throws IOException
+ * @throws IOException
+ * @throws IOException
+ * @throws IOException
*/
private void remove(LogicalExpression op) throws FrontendException {
pColConditions.add( getExpression( op ) );
removeTree( op );
}
-
+
/**
* Assume that the given operator is already disconnected from its predecessors.
* @param op
- * @throws FrontendException
+ * @throws FrontendException
*/
private void removeTree(Operator op) throws FrontendException {
List<Operator> succs = plan.getSuccessors( op );
@@ -406,17 +397,17 @@ public class PColFilterExtractor extends
plan.remove( op );
return;
}
-
+
Operator[] children = new Operator[succs.size()];
for( int i = 0; i < succs.size(); i++ ) {
children[i] = succs.get(i);
}
-
+
for( Operator succ : children ) {
plan.disconnect( op, succ );
removeTree( succ );
}
-
+
plan.remove( op );
}
@@ -461,6 +452,8 @@ public class PColFilterExtractor extends
return getExpression(binOp, OpType.OP_LT);
} else if(binOp instanceof LessThanEqualExpression) {
return getExpression(binOp, OpType.OP_LE);
+ } else if(binOp instanceof RegexExpression) {
+ return getExpression(binOp, OpType.OP_MATCH);
} else {
logInternalErrorAndSetFlag();
}
@@ -468,12 +461,12 @@ public class PColFilterExtractor extends
return null;
}
- private Expression getExpression(BinaryExpression binOp, OpType
+ private Expression getExpression(BinaryExpression binOp, OpType
opType) throws FrontendException {
return new Expression.BinaryExpression(getExpression(binOp.getLhs())
,getExpression(binOp.getRhs()), opType);
}
-
+
private void logInternalErrorAndSetFlag() throws FrontendException {
LOG.warn("No partition filter push down: "
+ "Internal error while processing any partition filter "
@@ -482,7 +475,7 @@ public class PColFilterExtractor extends
}
// this might get called from some visit() - in that case, delegate to
- // the other visit()s which we have defined here
+ // the other visit()s which we have defined here
private void visit(LogicalExpression op) throws FrontendException {
if(op instanceof ProjectExpression) {
visit((ProjectExpression)op);
@@ -510,11 +503,11 @@ public class PColFilterExtractor extends
}
private void visit(NotExpression not) throws FrontendException {
- visit(not.getExpression());
+ visit(not.getExpression());
}
private void visit(RegexExpression regexp) throws FrontendException {
- visit((BinaryExpression)regexp);
+ visit((BinaryExpression)regexp);
}
private void visit(BinCondExpression binCond) throws FrontendException {
Modified: pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java?rev=1399925&r1=1399924&r2=1399925&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java Thu Oct 18 23:38:50 2012
@@ -83,9 +83,9 @@ public class TestPartitionFilterPushDown
}
/**
- * test case where there is a single expression on partition columns in
+ * test case where there is a single expression on partition columns in
* the filter expression along with an expression on non partition column
- * @throws Exception
+ * @throws Exception
*/
@Test
public void testSimpleMixed() throws Exception {
@@ -100,7 +100,7 @@ public class TestPartitionFilterPushDown
@Test
public void testNoPartFilter() throws Exception {
String q = query + "b = filter a by age == 20 and name == 'foo';" + "store b into 'out';";
- test(q, Arrays.asList("srcid"), null,
+ test(q, Arrays.asList("srcid"), null,
"((age == 20) and (name == 'foo'))");
}
@@ -111,7 +111,7 @@ public class TestPartitionFilterPushDown
@Test
public void testOnlyPartFilter1() throws Exception {
String q = query + "b = filter a by srcid > 20 and mrkt == 'us';" + "store b into 'out';";
- test(q, Arrays.asList("srcid", "mrkt"),
+ test(q, Arrays.asList("srcid", "mrkt"),
"((srcid > 20) and (mrkt == 'us'))", null);
}
@@ -123,7 +123,7 @@ public class TestPartitionFilterPushDown
@Test
public void testOnlyPartFilter2() throws Exception {
String q = query + "b = filter a by mrkt == 'us';" + "store b into 'out';";
- test(q, Arrays.asList("srcid", "mrkt"),
+ test(q, Arrays.asList("srcid", "mrkt"),
"(mrkt == 'us')", null);
}
@@ -135,7 +135,7 @@ public class TestPartitionFilterPushDown
@Test
public void testOnlyPartFilter3() throws Exception {
String q = query + "b = filter a by srcid == 20 or mrkt == 'us';" + "store b into 'out';";
- test(q, Arrays.asList("srcid", "mrkt"),
+ test(q, Arrays.asList("srcid", "mrkt"),
"((srcid == 20) or (mrkt == 'us'))", null);
}
@@ -150,8 +150,8 @@ public class TestPartitionFilterPushDown
String q = query + "b = filter a by " +
"(age < 20 and mrkt == 'us') and (srcid == 10 and " +
"name == 'foo');" + "store b into 'out';";
- test(q, Arrays.asList("srcid", "mrkt"),
- "((mrkt == 'us') and (srcid == 10))",
+ test(q, Arrays.asList("srcid", "mrkt"),
+ "((mrkt == 'us') and (srcid == 10))",
"((age < 20) and (name == 'foo'))");
}
@@ -166,8 +166,8 @@ public class TestPartitionFilterPushDown
String q = query + "b = filter a by " +
"(age >= 20 and mrkt == 'us') and (srcid == 10 and " +
"dstid == 15);" + "store b into 'out';";
- test(q, Arrays.asList("srcid", "dstid", "mrkt"),
- "((mrkt == 'us') and ((srcid == 10) and (dstid == 15)))",
+ test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+ "((mrkt == 'us') and ((srcid == 10) and (dstid == 15)))",
"(age >= 20)");
}
@@ -180,7 +180,7 @@ public class TestPartitionFilterPushDown
public void testMixed3() throws Exception {
String q = query + "b = filter a by " +
"age >= 20 and mrkt == 'us' and srcid == 10;" + "store b into 'out';";
- test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+ test(q, Arrays.asList("srcid", "dstid", "mrkt"),
"((mrkt == 'us') and (srcid == 10))", "(age >= 20)");
}
@@ -195,16 +195,16 @@ public class TestPartitionFilterPushDown
String q = query + "b = filter a by " +
"age >= 20 and mrkt == 'us' and name == 'foo' and " +
"srcid == dstid;" + "store b into 'out';";
- test(q, Arrays.asList("srcid", "dstid", "mrkt"),
- "((mrkt == 'us') and (srcid == dstid))",
+ test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+ "((mrkt == 'us') and (srcid == dstid))",
"((age >= 20) and (name == 'foo'))");
}
/**
* test case where filter has both conditions on partition cols and non
* partition cols and the filter condition will be split to extract the
- * conditions on partition columns -
- * This testcase has two partition col conditions with OR + non parition
+ * conditions on partition columns -
+ * This testcase has two partition col conditions with OR + non parition
* col conditions
*/
@Test
@@ -212,35 +212,35 @@ public class TestPartitionFilterPushDown
String q = query + "b = filter a by " +
"(srcid == 10 or mrkt == 'us') and name == 'foo' and " +
"dstid == 30;" + "store b into 'out';";
- test(q, Arrays.asList("srcid", "dstid", "mrkt"),
- "(((srcid == 10) or (mrkt == 'us')) and (dstid == 30))",
+ test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+ "(((srcid == 10) or (mrkt == 'us')) and (dstid == 30))",
"(name == 'foo')");
}
/**
* test case where filter has both conditions on partition cols and non
* partition cols and the filter condition will be split to extract the
- * conditions on partition columns -
- * This testcase has two partition col conditions with OR + non parition
+ * conditions on partition columns -
+ * This testcase has two partition col conditions with OR + non parition
* col conditions
*/
@Test
public void testMixed6() throws Exception {
String q = query + "b = filter a by " +
"dstid == 30 and (srcid == 10 or mrkt == 'us') and name == 'foo';" + "store b into 'out';";
- test(q, Arrays.asList("srcid", "dstid", "mrkt"),
- "((dstid == 30) and ((srcid == 10) or (mrkt == 'us')))",
+ test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+ "((dstid == 30) and ((srcid == 10) or (mrkt == 'us')))",
"(name == 'foo')");
}
-
+
@Test
public void test7() throws Exception {
- String query = "a = load 'foo' using " + TestLoader.class.getName() +
+ String query = "a = load 'foo' using " + TestLoader.class.getName() +
"('srcid, mrkt, dstid, name, age', 'srcid, name');" +
"b = filter a by (srcid < 20 and age < 30) or (name == 'foo' and age > 40);" +
"store b into 'output';";
LogicalPlan plan = buildPlan(new PigServer(pc), query);
-
+
Rule rule = new PartitionFilterOptimizer("test");
List<OperatorPlan> matches = rule.match(plan);
if (matches != null) {
@@ -251,20 +251,20 @@ public class TestPartitionFilterPushDown
}
}
OperatorSubPlan newPlan = (OperatorSubPlan)transformer.reportChanges();
-
+
Assert.assertTrue(newPlan.getBasePlan().isEqual(plan));
}
-
+
}
-
+
@Test
public void test8() throws Exception {
String query = "a = load 'foo' using " + TestLoader.class.getName() +
"('srcid, mrkt, dstid, name, age', 'srcid,name');" +
- "b = filter a by (srcid < 20) or (name == 'foo');" +
+ "b = filter a by (srcid < 20) or (name == 'foo');" +
"store b into 'output';";
LogicalPlan plan = Util.buildLp(new PigServer(pc), query);
-
+
Rule rule = new PartitionFilterOptimizer("test");
List<OperatorPlan> matches = rule.match(plan);
if (matches != null) {
@@ -278,10 +278,10 @@ public class TestPartitionFilterPushDown
Assert.assertTrue(newPlan.getBasePlan().size() == 3);
}
-
+
}
-
-
+
+
/**
* test case where filter has both conditions on partition cols and non
* partition cols and the filter condition will be split to extract the
@@ -292,14 +292,14 @@ public class TestPartitionFilterPushDown
public void testMixedArith() throws Exception {
String q = query + "b = filter a by " +
"mrkt == 'us' and srcid * 10 == 150 + 20 and age != 15;" + "store b into 'out';";
- test(q, Arrays.asList("srcid", "dstid", "mrkt"),
- "((mrkt == 'us') and ((srcid * 10) == (150 + 20)))",
+ test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+ "((mrkt == 'us') and ((srcid * 10) == (150 + 20)))",
"(age != 15)");
}
@Test
public void testNegPColConditionWithNonPCol() throws Exception {
- // use of partition column condition and non partition column in
+ // use of partition column condition and non partition column in
// same condition should fail
String q = query + "b = filter a by " +
"srcid > age;" + "store b into 'out';";
@@ -308,7 +308,7 @@ public class TestPartitionFilterPushDown
"srcid + age == 20;" + "store b into 'out';";
negativeTest(q, Arrays.asList("srcid"), 1111);
- // OR of partition column condition and non partiton col condition
+ // OR of partition column condition and non partiton col condition
// should fail
q = query + "b = filter a by " +
"srcid > 10 or name == 'foo';" +
@@ -322,7 +322,7 @@ public class TestPartitionFilterPushDown
String q = query + "b = filter a by " +
"(srcid > 10 and name == 'foo') or dstid == 10;" + "store b into 'out';";
- negativeTest(q, Arrays.asList("srcid", "dstid"), expectedErrCode);
+ negativeTest(q, Arrays.asList("srcid", "dstid"), expectedErrCode);
expectedErrCode = 1110;
q = query + "b = filter a by " +
@@ -349,45 +349,45 @@ public class TestPartitionFilterPushDown
"(mrkt is not null) and name matches '.*foo.*';" + "store b into 'out';";
negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
}
-
+
// @Test
// public void testNegPColInWrongPlaces2() throws Exception {
-//
+//
// LogicalPlanTester tester = new LogicalPlanTester(pc);
// tester.buildPlan("a = load 'foo' using " + TestLoader.class.getName()
// + "('srcid, mrkt, dstid, name, age', 'srcid,dstid,mrkt');");
-//
+//
// org.apache.pig.impl.logicalLayer.LogicalPlan lp = tester
// .buildPlan("b = filter a by "
// + "(srcid > 10 and name == 'foo') or dstid == 10;");
-// negativeTest(lp);
-//
+// negativeTest(lp);
+//
// lp = tester.buildPlan("b = filter a by " +
// "CONCAT(mrkt, '_10') == 'US_10' and age == 20;");
// negativeTest(lp);
-//
+//
// lp = tester.buildPlan("b = filter a by " +
// "mrkt matches '.*us.*' and age < 15;");
// negativeTest(lp);
-//
+//
// lp = tester.buildPlan("b = filter a by " +
// "(int)mrkt == 10 and name matches '.*foo.*';");
// negativeTest(lp);
-//
+//
// lp = tester.buildPlan("b = filter a by " +
// "(mrkt == 'us' ? age : age + 10) == 40 and name matches '.*foo.*';");
// negativeTest(lp);
-//
+//
// lp = tester.buildPlan("b = filter a by " +
// "(mrkt is null) and name matches '.*foo.*';");
// negativeTest(lp);
-//
+//
// lp = tester.buildPlan("b = filter a by " +
// "(mrkt is not null) and name matches '.*foo.*';");
// negativeTest(lp);
// }
-
-
+
+
/**
* Test that pig sends correct partition column names in setPartitionFilter
* when the user has a schema in the load statement which renames partition
@@ -398,27 +398,27 @@ public class TestPartitionFilterPushDown
public void testColNameMapping1() throws Exception {
TestLoader.partFilter = null;
String q = "a = load 'foo' using "
- + TestLoader.class.getName() +
+ + TestLoader.class.getName() +
"('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
"'srcid,mrkt') as (f1, f2, f3, f4, f5);" +
"b = filter a by " +
- "(f5 >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);" +
+ "(f5 >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);" +
"store b into 'out';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
- Assert.assertEquals("checking partition filter:",
+ Assert.assertEquals("checking partition filter:",
"((mrkt == 'us') and (srcid == 10))",
TestLoader.partFilter.toString());
Operator op = newLogicalPlan.getSinks().get(0);
LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
-
+
PColFilterExtractor extractor = new PColFilterExtractor(filter.getFilterPlan(), new ArrayList<String>());
-
+
String actual = extractor.getExpression(
(LogicalExpression)filter.getFilterPlan().getSources().get(0)).
toString().toLowerCase();
- Assert.assertEquals("checking trimmed filter expression:",
+ Assert.assertEquals("checking trimmed filter expression:",
"((f5 >= 20) and (f3 == 15))", actual);
}
@@ -443,7 +443,7 @@ public class TestPartitionFilterPushDown
public void testColNameMapping2() throws Exception {
TestLoader.partFilter = null;
String q = "a = load 'foo' using "
- + TestLoader.class.getName() +
+ + TestLoader.class.getName() +
"('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
"'srcid') as (f1, f2, f3, f4, f5);" +
"b = filter a by " +
@@ -452,19 +452,19 @@ public class TestPartitionFilterPushDown
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
- Assert.assertEquals("checking partition filter:",
+ Assert.assertEquals("checking partition filter:",
null,
TestLoader.partFilter);
Operator op = newLogicalPlan.getSinks().get(0);
LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
-
+
PColFilterExtractor extractor = new PColFilterExtractor(filter.getFilterPlan(), new ArrayList<String>());
-
+
String actual = extractor.getExpression(
(LogicalExpression) filter.getFilterPlan().
getSources().get(0)).
toString().toLowerCase();
- Assert.assertEquals("checking trimmed filter expression:",
+ Assert.assertEquals("checking trimmed filter expression:",
"(((f5 >= 20) and (f2 == 'us')) and (f3 == 15))", actual);
}
@@ -479,23 +479,23 @@ public class TestPartitionFilterPushDown
public void testColNameMapping3() throws Exception {
TestLoader.partFilter = null;
String query = "a = load 'foo' using "
- + TestLoader.class.getName() +
+ + TestLoader.class.getName() +
"('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
"'srcid,mrkt,dstid,age') as (f1, f2, f3, f4, f5);" +
"b = filter a by " +
- "(f5 >= 20 or f2 == 'us') and (f1 == 10 and f3 == 15);" +
+ "(f5 >= 20 or f2 == 'us') and (f1 == 10 and f3 == 15);" +
"store b into 'out';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
- Assert.assertEquals("checking partition filter:",
+ Assert.assertEquals("checking partition filter:",
"(((age >= 20) or (mrkt == 'us')) and ((srcid == 10) and " +
"(dstid == 15)))",
TestLoader.partFilter.toString());
Iterator<Operator> it = newLogicalPlan.getOperators();
while( it.hasNext() ) {
Assert.assertFalse("Checking that filter has been removed since it contained" +
- " only conditions on partition cols:",
+ " only conditions on partition cols:",
(it.next() instanceof LOFilter));
}
}
@@ -503,8 +503,8 @@ public class TestPartitionFilterPushDown
/**
* Test that pig sends correct partition column names in setPartitionFilter
* when the user has a schema in the load statement which renames partition
- * columns - in this test case the schema in load statement is a prefix
- * (with columns renamed) of the schema returned by
+ * columns - in this test case the schema in load statement is a prefix
+ * (with columns renamed) of the schema returned by
* {@link LoadMetadata#getSchema(String, Configuration)}
* @throws Exception
*/
@@ -512,7 +512,7 @@ public class TestPartitionFilterPushDown
public void testColNameMapping4() throws Exception {
TestLoader.partFilter = null;
String q = "a = load 'foo' using "
- + TestLoader.class.getName() +
+ + TestLoader.class.getName() +
"('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
"'srcid,mrkt') as (f1:int, f2:chararray, f3:int, name:chararray, age:int);" +
"b = filter a by " +
@@ -520,18 +520,18 @@ public class TestPartitionFilterPushDown
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
- Assert.assertEquals("checking partition filter:",
+ Assert.assertEquals("checking partition filter:",
"((mrkt == 'us') and (srcid == 10))",
TestLoader.partFilter.toString());
Operator op = newLogicalPlan.getSinks().get(0);
LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
-
+
PColFilterExtractor extractor = new PColFilterExtractor(filter.getFilterPlan(), new ArrayList<String>());
-
+
String actual = extractor.getExpression(
(LogicalExpression) filter.getFilterPlan().getSources().get(0)).
toString().toLowerCase();
- Assert.assertEquals("checking trimmed filter expression:",
+ Assert.assertEquals("checking trimmed filter expression:",
"((age >= 20) and (f3 == 15))", actual);
}
@@ -543,11 +543,11 @@ public class TestPartitionFilterPushDown
public void testColNameMapping5() throws Exception {
TestLoader.partFilter = null;
String q = "a = load 'foo' using "
- + TestLoader.class.getName() +
+ + TestLoader.class.getName() +
"('mrkt:chararray, a1:chararray, a2:chararray, srcid:int, bcookie:chararray', " +
"'srcid');" +
"b = load 'bar' using "
- + TestLoader.class.getName() +
+ + TestLoader.class.getName() +
"('dstid:int, b1:int, b2:int, srcid:int, bcookie:chararray, mrkt:chararray'," +
"'srcid');" +
"a1 = filter a by srcid == 10;" +
@@ -567,14 +567,29 @@ public class TestPartitionFilterPushDown
while (iter.hasNext()) {
Assert.assertTrue(!(iter.next() instanceof LOFilter));
counter++;
- }
+ }
Assert.assertEquals(counter, 5);
}
+ /**
+ * Test PIG-2778 Add matches operator to predicate pushdown
+ * @throws Exception
+ */
+ @Test
+ public void testMatchOpPushDown() throws Exception {
+ // regexp condition on a partition col
+ String q = query + "b = filter a by name matches 'foo*';" + "store b into 'out';";
+ test(q, Arrays.asList("name"), "(name matches 'foo*')", null);
+
+ // regexp condition on a non-partition col
+ q = query + "b = filter a by name matches 'foo*';" + "store b into 'out';";
+ test(q, Arrays.asList("srcid"), null, "(name matches 'foo*')");
+ }
+
//// helper methods ///////
- private PColFilterExtractor test(String query, List<String> partitionCols,
- String expPartFilterString, String expFilterString)
+ private PColFilterExtractor test(String query, List<String> partitionCols,
+ String expPartFilterString, String expFilterString)
throws Exception {
PigServer pigServer = new PigServer( pc );
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
@@ -585,16 +600,16 @@ public class TestPartitionFilterPushDown
pColExtractor.visit();
if(expPartFilterString == null) {
- Assert.assertEquals("Checking partition column filter:", null,
+ Assert.assertEquals("Checking partition column filter:", null,
pColExtractor.getPColCondition());
} else {
- Assert.assertEquals("Checking partition column filter:",
- expPartFilterString.toLowerCase(),
- pColExtractor.getPColCondition().toString().toLowerCase());
+ Assert.assertEquals("Checking partition column filter:",
+ expPartFilterString.toLowerCase(),
+ pColExtractor.getPColCondition().toString().toLowerCase());
}
if(expFilterString == null) {
- Assert.assertTrue("Check that filter can be removed:",
+ Assert.assertTrue("Check that filter can be removed:",
pColExtractor.isFilterRemovable());
} else {
String actual = pColExtractor.getExpression(
@@ -617,7 +632,7 @@ public class TestPartitionFilterPushDown
try {
pColExtractor.visit();
} catch(Exception e) {
- Assert.assertEquals("Checking if exception has right error code",
+ Assert.assertEquals("Checking if exception has right error code",
expectedErrorCode, LogUtils.getPigException(e).getErrorCode());
return;
}
@@ -634,7 +649,7 @@ public class TestPartitionFilterPushDown
String[] partCols;
static Expression partFilter = null;
- public TestLoader(String schemaString, String commaSepPartitionCols)
+ public TestLoader(String schemaString, String commaSepPartitionCols)
throws ParserException {
schema = Utils.getSchemaFromString(schemaString);
partCols = commaSepPartitionCols.split(",");
@@ -680,7 +695,7 @@ public class TestPartitionFilterPushDown
@Override
public void setPartitionFilter(Expression partitionFilter)
throws IOException {
- partFilter = partitionFilter;
+ partFilter = partitionFilter;
}
}
@@ -690,14 +705,14 @@ public class TestPartitionFilterPushDown
super( p, iterations, new HashSet<String>() );
}
- protected List<Set<Rule>> buildRuleSets() {
+ protected List<Set<Rule>> buildRuleSets() {
List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
Set<Rule> s = new HashSet<Rule>();
// add split filter rule
Rule r = new PartitionFilterOptimizer("PartitionFilterPushDown");
s = new HashSet<Rule>();
- s.add(r);
+ s.add(r);
ls.add(s);
r = new LoadTypeCastInserter( "LoadTypeCastInserter" );