You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/10/19 01:38:51 UTC

svn commit: r1399925 - in /pig/trunk: src/org/apache/pig/Expression.java src/org/apache/pig/newplan/PColFilterExtractor.java test/org/apache/pig/test/TestPartitionFilterPushDown.java

Author: jcoveney
Date: Thu Oct 18 23:38:50 2012
New Revision: 1399925

URL: http://svn.apache.org/viewvc?rev=1399925&view=rev
Log:
Properly commit PIG-2778

Modified:
    pig/trunk/src/org/apache/pig/Expression.java
    pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java
    pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java

Modified: pig/trunk/src/org/apache/pig/Expression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Expression.java?rev=1399925&r1=1399924&r2=1399925&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Expression.java (original)
+++ pig/trunk/src/org/apache/pig/Expression.java Thu Oct 18 23:38:50 2012
@@ -28,16 +28,16 @@ import org.apache.pig.classification.Int
 @InterfaceStability.Evolving
 public abstract class Expression {
 
- // Operator type                                                                                                                                                                                                                                                                                       
+ // Operator type
     public static  enum OpType {
-        
+
         // binary arith ops
         OP_PLUS (" + "),
         OP_MINUS(" - "),
         OP_TIMES(" * "),
         OP_DIV(" / "),
         OP_MOD(" % "),
-          
+
         //binary ops
         OP_EQ(" == "),
         OP_NE(" != "),
@@ -45,29 +45,30 @@ public abstract class Expression {
         OP_GE(" >= "),
         OP_LT(" < "),
         OP_LE(" <= "),
+        OP_MATCH(" matches "),
 
         //binary logical
         OP_AND(" and "),
         OP_OR(" or "),
         TERM_COL(" Column "),
         TERM_CONST(" Constant ");
-        
+
         private String str = "";
         private OpType(String rep){
             this.str = rep;
         }
         private OpType(){
         }
-        
+
         @Override
         public String toString(){
             return this.str;
         }
-        
+
     }
-    
+
     protected OpType opType;
-    
+
     /**
      * @return the opType
      */
@@ -75,22 +76,22 @@ public abstract class Expression {
         return opType;
     }
 
-    
-    
-    
+
+
+
     public static class BinaryExpression extends Expression {
-        
+
         /**
          * left hand operand
          */
         Expression lhs;
-        
+
         /**
          * right hand operand
          */
         Expression rhs;
-    
-        
+
+
         /**
          * @param lhs
          * @param rhs
@@ -100,35 +101,35 @@ public abstract class Expression {
             this.lhs = lhs;
             this.rhs = rhs;
         }
-    
+
         /**
          * @return the left hand operand
          */
         public Expression getLhs() {
             return lhs;
         }
-    
+
         /**
          * @return the right hand operand
          */
         public Expression getRhs() {
             return rhs;
         }
-        
+
         @Override
         public String toString() {
             return "(" + lhs.toString() + opType.toString() + rhs.toString()
                                 + ")";
         }
     }
-    
+
     public static class Column extends Expression {
-        
+
         /**
          * name of column
          */
         private String name;
-    
+
         /**
          * @param name
          */
@@ -136,7 +137,7 @@ public abstract class Expression {
             this.opType = OpType.TERM_COL;
             this.name = name;
         }
-        
+
         @Override
         public String toString() {
             return name;
@@ -156,21 +157,21 @@ public abstract class Expression {
             this.name = name;
         }
     }
-    
+
     public static class Const extends Expression {
-        
+
         /**
          * value of the constant
          */
         Object value;
-    
+
         /**
          * @return the value
          */
         public Object getValue() {
             return value;
         }
-    
+
         /**
          * @param value
          */
@@ -178,10 +179,10 @@ public abstract class Expression {
             this.opType = OpType.TERM_CONST;
             this.value = value;
         }
-        
+
         @Override
         public String toString() {
-            return (value instanceof String) ? "\'" + value + "\'": 
+            return (value instanceof String) ? "\'" + value + "\'":
                 value.toString();
         }
     }

Modified: pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java?rev=1399925&r1=1399924&r2=1399925&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java Thu Oct 18 23:38:50 2012
@@ -56,17 +56,17 @@ import org.apache.pig.newplan.logical.ex
 import org.apache.pig.newplan.DepthFirstWalker;
 
 /**
- * This Visitor works on the filter condition of a LOFilter which immediately 
- * follows a LOLoad that interacts with a metadata system (currently OWL) to 
+ * This Visitor works on the filter condition of a LOFilter which immediately
+ * follows a LOLoad that interacts with a metadata system (currently OWL) to
  * read table data. The visitor looks for conditions on partition columns in the
  * filter condition and extracts those conditions out of the filter condition.
  * The condition on partition cols will be used to prune partitions of the table.
  *
  */
 public class PColFilterExtractor extends PlanVisitor {
-    
+
     private static final Log LOG = LogFactory.getLog(PColFilterExtractor.class);
-    
+
 	/**
 	 * partition columns associated with the table
 	 * present in the load on which the filter whose
@@ -93,14 +93,14 @@ public class PColFilterExtractor extends
 	private Side replaceSide = Side.NONE;
 
 	private boolean filterRemovable = false;
-	
+
     private boolean canPushDown = true;
-	
+
 	@Override
 	public void visit() throws FrontendException {
 		// we will visit the leaf and it will recursively walk the plan
 		LogicalExpression leaf = (LogicalExpression)plan.getSources().get( 0 );
-		// if the leaf is a unary operator it should be a FilterFunc in 
+		// if the leaf is a unary operator it should be a FilterFunc in
 		// which case we don't try to extract partition filter conditions
 		if(leaf instanceof BinaryExpression) {
 			BinaryExpression binExpr = (BinaryExpression)leaf;
@@ -119,7 +119,7 @@ public class PColFilterExtractor extends
 	}
 
 	/**
-	 * 
+	 *
 	 * @param plan logical plan corresponding the filter's comparison condition
 	 * @param partitionCols list of partition columns of the table which is
 	 * being loaded in the LOAD statement which is input to the filter
@@ -138,23 +138,14 @@ public class PColFilterExtractor extends
 		if(partitionCols.contains(fieldName)) {
 			sawKey = true;
 			// The condition on partition column will be used to prune the
-			// scan and removed from the filter condition. Hence the condition 
+			// scan and removed from the filter condition. Hence the condition
 			// on the partition column will not be re applied when data is read,
 			// so the following cases should throw error until that changes.
 			List<Class<?>> opsToCheckFor = new ArrayList<Class<?>>();
-			opsToCheckFor.add(RegexExpression.class);
+                        opsToCheckFor.add(UserFuncExpression.class);
 			if(checkSuccessors(project, opsToCheckFor)) {
             LOG.warn("No partition filter push down: " +
-                "You have an partition column (" 
-                + fieldName + ") inside a regexp operator in the " +
-                "filter condition.");
-            canPushDown = false;
-            return;
-			} 
-			opsToCheckFor.set(0, UserFuncExpression.class);
-			if(checkSuccessors(project, opsToCheckFor)) {
-            LOG.warn("No partition filter push down: " +
-                "You have an partition column (" 
+                "You have an partition column ("
                 + fieldName + ") inside a function in the " +
                 "filter condition.");
             canPushDown = false;
@@ -163,7 +154,7 @@ public class PColFilterExtractor extends
 			opsToCheckFor.set(0, CastExpression.class);
 			if(checkSuccessors(project, opsToCheckFor)) {
             LOG.warn("No partition filter push down: " +
-                "You have an partition column (" 
+                "You have an partition column ("
                 + fieldName + ") inside a cast in the " +
                 "filter condition.");
             canPushDown = false;
@@ -172,7 +163,7 @@ public class PColFilterExtractor extends
 			opsToCheckFor.set(0, IsNullExpression.class);
 			if(checkSuccessors(project, opsToCheckFor)) {
             LOG.warn("No partition filter push down: " +
-                "You have an partition column (" 
+                "You have an partition column ("
                 + fieldName + ") inside a null check operator in the " +
                 "filter condition.");
             canPushDown = false;
@@ -181,7 +172,7 @@ public class PColFilterExtractor extends
 			opsToCheckFor.set(0, BinCondExpression.class);
 			if(checkSuccessors(project, opsToCheckFor)) {
             LOG.warn("No partition filter push down: " +
-                "You have an partition column (" 
+                "You have an partition column ("
                 + fieldName + ") inside a bincond operator in the " +
                 "filter condition.");
             canPushDown = false;
@@ -204,10 +195,10 @@ public class PColFilterExtractor extends
 	}
 
 	private void visit(BinaryExpression binOp) throws FrontendException {
-		boolean lhsSawKey = false;        
-		boolean rhsSawKey = false;        
-		boolean lhsSawNonKeyCol = false;        
-		boolean rhsSawNonKeyCol = false;        
+		boolean lhsSawKey = false;
+		boolean rhsSawKey = false;
+		boolean lhsSawNonKeyCol = false;
+		boolean rhsSawNonKeyCol = false;
 
 		sawKey = false;
 		sawNonKeyCol = false;
@@ -223,12 +214,12 @@ public class PColFilterExtractor extends
 		rhsSawKey = sawKey;
 		rhsSawNonKeyCol = sawNonKeyCol;
 
-		// only in the case of an AND, we potentially split the AND to 
-		// remove conditions on partition columns out of the AND. For this 
+		// only in the case of an AND, we potentially split the AND to
+		// remove conditions on partition columns out of the AND. For this
 		// we set replaceSide accordingly so that when we reach a predecessor
-		// we can trim the appropriate side. If both sides of the AND have 
-		// conditions on partition columns, we will remove the AND completely - 
-		// in this case, we will not set replaceSide, but sawKey will be 
+		// we can trim the appropriate side. If both sides of the AND have
+		// conditions on partition columns, we will remove the AND completely -
+		// in this case, we will not set replaceSide, but sawKey will be
 		// true so that as we go to higher predecessor ANDs we can trim later.
 		if(binOp instanceof AndExpression) {
 			if(lhsSawKey && rhsSawNonKeyCol){
@@ -277,13 +268,13 @@ public class PColFilterExtractor extends
 	/**
 	 * check for the presence of a certain operator type in the Successors
 	 * @param opToStartFrom
-	 * @param opsToCheckFor operators to be checked for at each level of 
-	 * Successors - the ordering in the list is the order in which the ops 
+	 * @param opsToCheckFor operators to be checked for at each level of
+	 * Successors - the ordering in the list is the order in which the ops
 	 * will be checked.
 	 * @return true if opsToCheckFor are found
-	 * @throws IOException 
+	 * @throws IOException
 	 */
-	private boolean checkSuccessors(Operator opToStartFrom, 
+	private boolean checkSuccessors(Operator opToStartFrom,
 			List<Class<?>> opsToCheckFor) throws FrontendException {
 		boolean done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
 		if(!done && !opsToCheckFor.isEmpty()) {
@@ -296,7 +287,7 @@ public class PColFilterExtractor extends
 		return opsToCheckFor.isEmpty();
 	}
 
-	private boolean checkSuccessorsHelper(Operator opToStartFrom, 
+	private boolean checkSuccessorsHelper(Operator opToStartFrom,
 			List<Class<?>> opsToCheckFor) throws FrontendException {
 		List<Operator> successors = plan.getPredecessors(
 				opToStartFrom);
@@ -329,12 +320,12 @@ public class PColFilterExtractor extends
 
 		// eg if replaceSide == Side.LEFT
 		//    binexpop
-		//   /   \ \ 
+		//   /   \ \
 		// child (this is the childExpr argument send in)
 		//  /  \
-		// Lt   Rt 
+		// Lt   Rt
 		//
-		// gets converted to 
+		// gets converted to
 		//  binexpop
 		//  /
 		// Rt
@@ -344,10 +335,10 @@ public class PColFilterExtractor extends
 	         return;
 		}
 		// child's lhs operand
-		LogicalExpression leftChild = 
+		LogicalExpression leftChild =
 			((BinaryExpression)childExpr).getLhs();
 		// child's rhs operand
-		LogicalExpression rightChild = 
+		LogicalExpression rightChild =
 			((BinaryExpression)childExpr).getRhs();
 
 		plan.disconnect( childExpr, leftChild );
@@ -365,12 +356,12 @@ public class PColFilterExtractor extends
 		     logInternalErrorAndSetFlag();
 	         return;
 		}
-		//reset 
+		//reset
 		replaceSide = Side.NONE;
 		sawKey = false;
 
 	}
-	
+
 	private void replace(Operator oldOp, Operator newOp) throws FrontendException {
 		List<Operator> grandParents = plan.getPredecessors( oldOp );
 		if( grandParents == null || grandParents.size() == 0 ) {
@@ -386,19 +377,19 @@ public class PColFilterExtractor extends
 
 	/**
 	 * @param op
-	 * @throws IOException 
-	 * @throws IOException 
-	 * @throws IOException 
+	 * @throws IOException
+	 * @throws IOException
+	 * @throws IOException
 	 */
 	private void remove(LogicalExpression op) throws FrontendException {
 		pColConditions.add( getExpression( op ) );
 		removeTree( op );
 	}
-	
+
 	/**
 	 * Assume that the given operator is already disconnected from its predecessors.
 	 * @param op
-	 * @throws FrontendException 
+	 * @throws FrontendException
 	 */
 	private void removeTree(Operator op) throws FrontendException {
 		List<Operator> succs = plan.getSuccessors( op );
@@ -406,17 +397,17 @@ public class PColFilterExtractor extends
 			plan.remove( op );
 			return;
 		}
-		
+
 		Operator[] children = new Operator[succs.size()];
 		for( int i = 0; i < succs.size(); i++ ) {
 			children[i] = succs.get(i);
 		}
-		
+
 		for( Operator succ : children ) {
 			plan.disconnect( op, succ );
 			removeTree( succ );
 		}
-		
+
 		plan.remove( op );
 	}
 
@@ -461,6 +452,8 @@ public class PColFilterExtractor extends
 				return getExpression(binOp, OpType.OP_LT);
 			} else if(binOp instanceof LessThanEqualExpression) {
 				return getExpression(binOp, OpType.OP_LE);
+                        } else if(binOp instanceof RegexExpression) {
+                                return getExpression(binOp, OpType.OP_MATCH);
 			} else {
             logInternalErrorAndSetFlag();
 			}
@@ -468,12 +461,12 @@ public class PColFilterExtractor extends
 		return null;
 	}
 
-    private Expression getExpression(BinaryExpression binOp, OpType 
+    private Expression getExpression(BinaryExpression binOp, OpType
             opType) throws FrontendException {
         return new Expression.BinaryExpression(getExpression(binOp.getLhs())
                 ,getExpression(binOp.getRhs()), opType);
     }
-    
+
     private void logInternalErrorAndSetFlag() throws FrontendException {
         LOG.warn("No partition filter push down: "
                 + "Internal error while processing any partition filter "
@@ -482,7 +475,7 @@ public class PColFilterExtractor extends
     }
 
 	// this might get called from some visit() - in that case, delegate to
-	// the other visit()s which we have defined here 
+	// the other visit()s which we have defined here
 	private void visit(LogicalExpression op) throws FrontendException {
 		if(op instanceof ProjectExpression) {
 			visit((ProjectExpression)op);
@@ -510,11 +503,11 @@ public class PColFilterExtractor extends
 	}
 
 	private void visit(NotExpression not) throws FrontendException {
-		visit(not.getExpression());   
+		visit(not.getExpression());
 	}
 
 	private void visit(RegexExpression regexp) throws FrontendException {
-		visit((BinaryExpression)regexp);    
+		visit((BinaryExpression)regexp);
 	}
 
 	private void visit(BinCondExpression binCond) throws FrontendException {

Modified: pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java?rev=1399925&r1=1399924&r2=1399925&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java Thu Oct 18 23:38:50 2012
@@ -83,9 +83,9 @@ public class TestPartitionFilterPushDown
     }
 
     /**
-     * test case where there is a single expression on partition columns in 
+     * test case where there is a single expression on partition columns in
      * the filter expression along with an expression on non partition column
-     * @throws Exception 
+     * @throws Exception
      */
     @Test
     public void testSimpleMixed() throws Exception {
@@ -100,7 +100,7 @@ public class TestPartitionFilterPushDown
     @Test
     public void testNoPartFilter() throws Exception {
         String q = query + "b = filter a by age == 20 and name == 'foo';" + "store b into 'out';";
-        test(q, Arrays.asList("srcid"), null, 
+        test(q, Arrays.asList("srcid"), null,
         "((age == 20) and (name == 'foo'))");
     }
 
@@ -111,7 +111,7 @@ public class TestPartitionFilterPushDown
     @Test
     public void testOnlyPartFilter1() throws Exception {
         String q = query + "b = filter a by srcid > 20 and mrkt == 'us';" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "mrkt"), 
+        test(q, Arrays.asList("srcid", "mrkt"),
                 "((srcid > 20) and (mrkt == 'us'))", null);
 
     }
@@ -123,7 +123,7 @@ public class TestPartitionFilterPushDown
     @Test
     public void testOnlyPartFilter2() throws Exception {
         String q = query + "b = filter a by mrkt == 'us';" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "mrkt"), 
+        test(q, Arrays.asList("srcid", "mrkt"),
                 "(mrkt == 'us')", null);
 
     }
@@ -135,7 +135,7 @@ public class TestPartitionFilterPushDown
     @Test
     public void testOnlyPartFilter3() throws Exception {
         String q = query + "b = filter a by srcid == 20 or mrkt == 'us';" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "mrkt"), 
+        test(q, Arrays.asList("srcid", "mrkt"),
                 "((srcid == 20) or (mrkt == 'us'))", null);
 
     }
@@ -150,8 +150,8 @@ public class TestPartitionFilterPushDown
         String q = query + "b = filter a by " +
         "(age < 20 and  mrkt == 'us') and (srcid == 10 and " +
         "name == 'foo');" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "mrkt"), 
-                "((mrkt == 'us') and (srcid == 10))", 
+        test(q, Arrays.asList("srcid", "mrkt"),
+                "((mrkt == 'us') and (srcid == 10))",
         "((age < 20) and (name == 'foo'))");
     }
 
@@ -166,8 +166,8 @@ public class TestPartitionFilterPushDown
         String q = query + "b = filter a by " +
         "(age >= 20 and  mrkt == 'us') and (srcid == 10 and " +
         "dstid == 15);" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "dstid", "mrkt"), 
-                "((mrkt == 'us') and ((srcid == 10) and (dstid == 15)))", 
+        test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+                "((mrkt == 'us') and ((srcid == 10) and (dstid == 15)))",
         "(age >= 20)");
     }
 
@@ -180,7 +180,7 @@ public class TestPartitionFilterPushDown
     public void testMixed3() throws Exception {
         String q = query + "b = filter a by " +
         "age >= 20 and  mrkt == 'us' and srcid == 10;" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "dstid", "mrkt"), 
+        test(q, Arrays.asList("srcid", "dstid", "mrkt"),
                 "((mrkt == 'us') and (srcid == 10))", "(age >= 20)");
     }
 
@@ -195,16 +195,16 @@ public class TestPartitionFilterPushDown
         String q = query + "b = filter a by " +
         "age >= 20 and  mrkt == 'us' and name == 'foo' and " +
         "srcid == dstid;" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "dstid", "mrkt"), 
-                "((mrkt == 'us') and (srcid == dstid))", 
+        test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+                "((mrkt == 'us') and (srcid == dstid))",
         "((age >= 20) and (name == 'foo'))");
     }
 
     /**
      * test case where filter has both conditions on partition cols and non
      * partition cols and the filter condition will be split to extract the
-     * conditions on partition columns - 
-     * This testcase has two partition col conditions  with OR +  non parition 
+     * conditions on partition columns -
+     * This testcase has two partition col conditions  with OR +  non parition
      * col conditions
      */
     @Test
@@ -212,35 +212,35 @@ public class TestPartitionFilterPushDown
         String q = query + "b = filter a by " +
         "(srcid == 10 or mrkt == 'us') and name == 'foo' and " +
         "dstid == 30;" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "dstid", "mrkt"), 
-                "(((srcid == 10) or (mrkt == 'us')) and (dstid == 30))", 
+        test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+                "(((srcid == 10) or (mrkt == 'us')) and (dstid == 30))",
         "(name == 'foo')");
     }
 
     /**
      * test case where filter has both conditions on partition cols and non
      * partition cols and the filter condition will be split to extract the
-     * conditions on partition columns - 
-     * This testcase has two partition col conditions  with OR +  non parition 
+     * conditions on partition columns -
+     * This testcase has two partition col conditions  with OR +  non parition
      * col conditions
      */
     @Test
     public void testMixed6() throws Exception {
         String q = query + "b = filter a by " +
         "dstid == 30 and (srcid == 10 or mrkt == 'us') and name == 'foo';" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "dstid", "mrkt"), 
-                "((dstid == 30) and ((srcid == 10) or (mrkt == 'us')))", 
+        test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+                "((dstid == 30) and ((srcid == 10) or (mrkt == 'us')))",
         "(name == 'foo')");
     }
-    
+
     @Test
     public void test7() throws Exception {
-        String query = "a = load 'foo' using " + TestLoader.class.getName() + 
+        String query = "a = load 'foo' using " + TestLoader.class.getName() +
             "('srcid, mrkt, dstid, name, age', 'srcid, name');" +
             "b = filter a by (srcid < 20 and age < 30) or (name == 'foo' and age > 40);" +
             "store b into 'output';";
         LogicalPlan plan = buildPlan(new PigServer(pc), query);
-        
+
         Rule rule = new PartitionFilterOptimizer("test");
         List<OperatorPlan> matches = rule.match(plan);
         if (matches != null) {
@@ -251,20 +251,20 @@ public class TestPartitionFilterPushDown
                 }
             }
             OperatorSubPlan newPlan = (OperatorSubPlan)transformer.reportChanges();
- 
+
             Assert.assertTrue(newPlan.getBasePlan().isEqual(plan));
         }
-  
+
     }
-    
+
     @Test
     public void test8() throws Exception {
         String query = "a = load 'foo' using " + TestLoader.class.getName() +
             "('srcid, mrkt, dstid, name, age', 'srcid,name');" +
-            "b = filter a by (srcid < 20) or (name == 'foo');" + 
+            "b = filter a by (srcid < 20) or (name == 'foo');" +
             "store b into 'output';";
         LogicalPlan plan = Util.buildLp(new PigServer(pc), query);
-        
+
         Rule rule = new PartitionFilterOptimizer("test");
         List<OperatorPlan> matches = rule.match(plan);
         if (matches != null) {
@@ -278,10 +278,10 @@ public class TestPartitionFilterPushDown
 
             Assert.assertTrue(newPlan.getBasePlan().size() == 3);
         }
-  
+
     }
-    
-    
+
+
     /**
      * test case where filter has both conditions on partition cols and non
      * partition cols and the filter condition will be split to extract the
@@ -292,14 +292,14 @@ public class TestPartitionFilterPushDown
     public void testMixedArith() throws Exception {
         String q = query + "b = filter a by " +
         "mrkt == 'us' and srcid * 10 == 150 + 20 and age != 15;" + "store b into 'out';";
-        test(q, Arrays.asList("srcid", "dstid", "mrkt"), 
-                "((mrkt == 'us') and ((srcid * 10) == (150 + 20)))", 
+        test(q, Arrays.asList("srcid", "dstid", "mrkt"),
+                "((mrkt == 'us') and ((srcid * 10) == (150 + 20)))",
         "(age != 15)");
     }
 
     @Test
     public void testNegPColConditionWithNonPCol() throws Exception {
-        // use of partition column condition and non partition column in 
+        // use of partition column condition and non partition column in
         // same condition should fail
         String q = query + "b = filter a by " +
         "srcid > age;" + "store b into 'out';";
@@ -308,7 +308,7 @@ public class TestPartitionFilterPushDown
         "srcid + age == 20;" + "store b into 'out';";
         negativeTest(q, Arrays.asList("srcid"), 1111);
 
-        // OR of partition column condition and non partiton col condition 
+        // OR of partition column condition and non partiton col condition
         // should fail
         q = query + "b = filter a by " +
         "srcid > 10 or name == 'foo';" +
@@ -322,7 +322,7 @@ public class TestPartitionFilterPushDown
 
         String q = query + "b = filter a by " +
         "(srcid > 10 and name == 'foo') or dstid == 10;" + "store b into 'out';";
-        negativeTest(q, Arrays.asList("srcid", "dstid"), expectedErrCode); 
+        negativeTest(q, Arrays.asList("srcid", "dstid"), expectedErrCode);
 
         expectedErrCode = 1110;
         q = query + "b = filter a by " +
@@ -349,45 +349,45 @@ public class TestPartitionFilterPushDown
         "(mrkt is not null) and name matches '.*foo.*';" + "store b into 'out';";
         negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
     }
-    
+
 //    @Test
 //    public void testNegPColInWrongPlaces2() throws Exception {
-//        
+//
 //        LogicalPlanTester tester = new LogicalPlanTester(pc);
 //        tester.buildPlan("a = load 'foo' using " + TestLoader.class.getName()
 //                + "('srcid, mrkt, dstid, name, age', 'srcid,dstid,mrkt');");
-//        
+//
 //        org.apache.pig.impl.logicalLayer.LogicalPlan lp = tester
 //                .buildPlan("b = filter a by "
 //                        + "(srcid > 10 and name == 'foo') or dstid == 10;");
-//        negativeTest(lp); 
-//        
+//        negativeTest(lp);
+//
 //        lp = tester.buildPlan("b = filter a by " +
 //                "CONCAT(mrkt, '_10') == 'US_10' and age == 20;");
 //        negativeTest(lp);
-//        
+//
 //        lp = tester.buildPlan("b = filter a by " +
 //                "mrkt matches '.*us.*' and age < 15;");
 //        negativeTest(lp);
-//        
+//
 //        lp = tester.buildPlan("b = filter a by " +
 //                "(int)mrkt == 10 and name matches '.*foo.*';");
 //        negativeTest(lp);
-//        
+//
 //        lp = tester.buildPlan("b = filter a by " +
 //            "(mrkt == 'us' ? age : age + 10) == 40 and name matches '.*foo.*';");
 //        negativeTest(lp);
-//        
+//
 //        lp = tester.buildPlan("b = filter a by " +
 //            "(mrkt is null) and name matches '.*foo.*';");
 //        negativeTest(lp);
-//        
+//
 //        lp = tester.buildPlan("b = filter a by " +
 //            "(mrkt is not null) and name matches '.*foo.*';");
 //        negativeTest(lp);
 //    }
-    
-    
+
+
     /**
      * Test that pig sends correct partition column names in setPartitionFilter
      * when the user has a schema in the load statement which renames partition
@@ -398,27 +398,27 @@ public class TestPartitionFilterPushDown
     public void testColNameMapping1() throws Exception {
         TestLoader.partFilter = null;
         String q = "a = load 'foo' using "
-            + TestLoader.class.getName() + 
+            + TestLoader.class.getName() +
             "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
             "'srcid,mrkt') as (f1, f2, f3, f4, f5);" +
             "b = filter a by " +
-            "(f5 >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);" + 
+            "(f5 >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);" +
             "store b into 'out';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
 
-        Assert.assertEquals("checking partition filter:",             
+        Assert.assertEquals("checking partition filter:",
                 "((mrkt == 'us') and (srcid == 10))",
                 TestLoader.partFilter.toString());
         Operator op = newLogicalPlan.getSinks().get(0);
         LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
-        
+
         PColFilterExtractor extractor = new PColFilterExtractor(filter.getFilterPlan(), new ArrayList<String>());
-        
+
         String actual = extractor.getExpression(
                 (LogicalExpression)filter.getFilterPlan().getSources().get(0)).
                 toString().toLowerCase();
-        Assert.assertEquals("checking trimmed filter expression:", 
+        Assert.assertEquals("checking trimmed filter expression:",
                 "((f5 >= 20) and (f3 == 15))", actual);
     }
 
@@ -443,7 +443,7 @@ public class TestPartitionFilterPushDown
     public void testColNameMapping2() throws Exception {
         TestLoader.partFilter = null;
         String q = "a = load 'foo' using "
-            + TestLoader.class.getName() + 
+            + TestLoader.class.getName() +
             "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
             "'srcid') as (f1, f2, f3, f4, f5);" +
             "b = filter a by " +
@@ -452,19 +452,19 @@ public class TestPartitionFilterPushDown
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
 
-        Assert.assertEquals("checking partition filter:",             
+        Assert.assertEquals("checking partition filter:",
                 null,
                 TestLoader.partFilter);
         Operator op = newLogicalPlan.getSinks().get(0);
         LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
-        
+
         PColFilterExtractor extractor = new PColFilterExtractor(filter.getFilterPlan(), new ArrayList<String>());
-        
+
         String actual = extractor.getExpression(
                 (LogicalExpression) filter.getFilterPlan().
                 getSources().get(0)).
                 toString().toLowerCase();
-        Assert.assertEquals("checking trimmed filter expression:", 
+        Assert.assertEquals("checking trimmed filter expression:",
                 "(((f5 >= 20) and (f2 == 'us')) and (f3 == 15))", actual);
     }
 
@@ -479,23 +479,23 @@ public class TestPartitionFilterPushDown
     public void testColNameMapping3() throws Exception {
         TestLoader.partFilter = null;
         String query = "a = load 'foo' using "
-            + TestLoader.class.getName() + 
+            + TestLoader.class.getName() +
             "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
             "'srcid,mrkt,dstid,age') as (f1, f2, f3, f4, f5);" +
             "b = filter a by " +
-            "(f5 >= 20 or f2 == 'us') and (f1 == 10 and f3 == 15);" + 
+            "(f5 >= 20 or f2 == 'us') and (f1 == 10 and f3 == 15);" +
             "store b into 'out';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
-        Assert.assertEquals("checking partition filter:",             
+        Assert.assertEquals("checking partition filter:",
                 "(((age >= 20) or (mrkt == 'us')) and ((srcid == 10) and " +
                 "(dstid == 15)))",
                 TestLoader.partFilter.toString());
         Iterator<Operator> it = newLogicalPlan.getOperators();
         while( it.hasNext() ) {
 	        Assert.assertFalse("Checking that filter has been removed since it contained" +
-	                " only conditions on partition cols:", 
+	                " only conditions on partition cols:",
 	                (it.next() instanceof LOFilter));
         }
     }
@@ -503,8 +503,8 @@ public class TestPartitionFilterPushDown
     /**
      * Test that pig sends correct partition column names in setPartitionFilter
      * when the user has a schema in the load statement which renames partition
-     * columns - in this test case the schema in load statement is a prefix 
-     * (with columns renamed) of the schema returned by 
+     * columns - in this test case the schema in load statement is a prefix
+     * (with columns renamed) of the schema returned by
      * {@link LoadMetadata#getSchema(String, Configuration)}
      * @throws Exception
      */
@@ -512,7 +512,7 @@ public class TestPartitionFilterPushDown
     public void testColNameMapping4() throws Exception {
         TestLoader.partFilter = null;
         String q = "a = load 'foo' using "
-            + TestLoader.class.getName() + 
+            + TestLoader.class.getName() +
             "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
             "'srcid,mrkt') as (f1:int, f2:chararray, f3:int, name:chararray, age:int);" +
             "b = filter a by " +
@@ -520,18 +520,18 @@ public class TestPartitionFilterPushDown
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
 
-        Assert.assertEquals("checking partition filter:",             
+        Assert.assertEquals("checking partition filter:",
                 "((mrkt == 'us') and (srcid == 10))",
                 TestLoader.partFilter.toString());
         Operator op = newLogicalPlan.getSinks().get(0);
         LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
-        
+
         PColFilterExtractor extractor = new PColFilterExtractor(filter.getFilterPlan(), new ArrayList<String>());
-        
+
         String actual = extractor.getExpression(
                 (LogicalExpression) filter.getFilterPlan().getSources().get(0)).
                 toString().toLowerCase();
-        Assert.assertEquals("checking trimmed filter expression:", 
+        Assert.assertEquals("checking trimmed filter expression:",
                 "((age >= 20) and (f3 == 15))", actual);
     }
 
@@ -543,11 +543,11 @@ public class TestPartitionFilterPushDown
     public void testColNameMapping5() throws Exception {
         TestLoader.partFilter = null;
         String q = "a = load 'foo' using "
-            + TestLoader.class.getName() + 
+            + TestLoader.class.getName() +
             "('mrkt:chararray, a1:chararray, a2:chararray, srcid:int, bcookie:chararray', " +
             "'srcid');" +
             "b = load 'bar' using "
-            + TestLoader.class.getName() + 
+            + TestLoader.class.getName() +
             "('dstid:int, b1:int, b2:int, srcid:int, bcookie:chararray, mrkt:chararray'," +
             "'srcid');" +
             "a1 = filter a by srcid == 10;" +
@@ -567,14 +567,29 @@ public class TestPartitionFilterPushDown
         while (iter.hasNext()) {
             Assert.assertTrue(!(iter.next() instanceof LOFilter));
             counter++;
-        }      
+        }
         Assert.assertEquals(counter, 5);
     }
 
+    /**
+     * Test PIG-2778 Add matches operator to predicate pushdown
+     * @throws Exception
+     */
+    @Test
+    public void testMatchOpPushDown() throws Exception {
+        // regexp condition on a partition col
+        String q = query + "b = filter a by name matches 'foo*';" + "store b into 'out';";
+        test(q, Arrays.asList("name"), "(name matches 'foo*')", null);
+
+        // regexp condition on a non-partition col
+        q = query + "b = filter a by name matches 'foo*';" + "store b into 'out';";
+        test(q, Arrays.asList("srcid"), null, "(name matches 'foo*')");
+    }
+
     //// helper methods ///////
 
-    private PColFilterExtractor test(String query, List<String> partitionCols, 
-            String expPartFilterString, String expFilterString) 
+    private PColFilterExtractor test(String query, List<String> partitionCols,
+            String expPartFilterString, String expFilterString)
     throws Exception {
         PigServer pigServer = new PigServer( pc );
         LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
@@ -585,16 +600,16 @@ public class TestPartitionFilterPushDown
         pColExtractor.visit();
 
         if(expPartFilterString == null) {
-            Assert.assertEquals("Checking partition column filter:", null, 
+            Assert.assertEquals("Checking partition column filter:", null,
                     pColExtractor.getPColCondition());
         } else  {
-            Assert.assertEquals("Checking partition column filter:", 
-                    expPartFilterString.toLowerCase(), 
-                    pColExtractor.getPColCondition().toString().toLowerCase());   
+            Assert.assertEquals("Checking partition column filter:",
+                    expPartFilterString.toLowerCase(),
+                    pColExtractor.getPColCondition().toString().toLowerCase());
         }
 
         if(expFilterString == null) {
-            Assert.assertTrue("Check that filter can be removed:", 
+            Assert.assertTrue("Check that filter can be removed:",
                     pColExtractor.isFilterRemovable());
         } else {
             String actual = pColExtractor.getExpression(
@@ -617,7 +632,7 @@ public class TestPartitionFilterPushDown
         try {
             pColExtractor.visit();
         } catch(Exception e) {
-            Assert.assertEquals("Checking if exception has right error code", 
+            Assert.assertEquals("Checking if exception has right error code",
                     expectedErrorCode, LogUtils.getPigException(e).getErrorCode());
             return;
         }
@@ -634,7 +649,7 @@ public class TestPartitionFilterPushDown
         String[] partCols;
         static Expression partFilter = null;
 
-        public TestLoader(String schemaString, String commaSepPartitionCols) 
+        public TestLoader(String schemaString, String commaSepPartitionCols)
         throws ParserException {
             schema = Utils.getSchemaFromString(schemaString);
             partCols = commaSepPartitionCols.split(",");
@@ -680,7 +695,7 @@ public class TestPartitionFilterPushDown
         @Override
         public void setPartitionFilter(Expression partitionFilter)
         throws IOException {
-            partFilter = partitionFilter;            
+            partFilter = partitionFilter;
         }
 
     }
@@ -690,14 +705,14 @@ public class TestPartitionFilterPushDown
             super( p, iterations, new HashSet<String>() );
         }
 
-        protected List<Set<Rule>> buildRuleSets() {            
+        protected List<Set<Rule>> buildRuleSets() {
             List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
 
             Set<Rule> s = new HashSet<Rule>();
             // add split filter rule
             Rule r = new PartitionFilterOptimizer("PartitionFilterPushDown");
             s = new HashSet<Rule>();
-            s.add(r);            
+            s.add(r);
             ls.add(s);
 
             r = new LoadTypeCastInserter( "LoadTypeCastInserter" );