You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2013/04/29 23:20:11 UTC

svn commit: r1477347 - in /pig/trunk: CHANGES.txt src/org/apache/pig/newplan/PColFilterExtractor.java test/org/apache/pig/test/TestPartitionFilterPushDown.java

Author: rohini
Date: Mon Apr 29 21:20:10 2013
New Revision: 1477347

URL: http://svn.apache.org/r1477347
Log:
PIG-3173: Partition filter push down does not happen when partition keys condition include a AND and OR construct (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java
    pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1477347&r1=1477346&r2=1477347&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr 29 21:20:10 2013
@@ -28,6 +28,8 @@ PIG-3174:  Remove rpm and deb artifacts 
 
 IMPROVEMENTS
 
+PIG-3173: Partition filter push down does not happen when partition keys condition include a AND and OR construct (rohini)
+
 PIG-2786: enhance Pig launcher script wrt. HBase/HCat integration (ndimiduk via daijy)
 
 PIG-3198: Let users use any function from PigType -> PigType as if it were builtlin (jcoveney)

Modified: pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java?rev=1477347&r1=1477346&r2=1477347&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java Mon Apr 29 21:20:10 2013
@@ -179,17 +179,6 @@ public class PColFilterExtractor extends
             canPushDown = false;
             return;
 			}
-			opsToCheckFor.set(0, AndExpression.class);
-			opsToCheckFor.add(OrExpression.class);
-			if(checkSuccessors(project, opsToCheckFor)) {
-            LOG.warn("No partition filter push down: " +
-                "You have an partition column (" + fieldName +
-                " ) in a construction like: " +
-                "(pcond  and ...) or (pcond and ...) " +
-                "where pcond is a condition on a partition column.");
-            canPushDown = false;
-            return;
-			}
 		} else {
 			sawNonKeyCol = true;
 		}
@@ -200,9 +189,35 @@ public class PColFilterExtractor extends
 		boolean rhsSawKey = false;
 		boolean lhsSawNonKeyCol = false;
 		boolean rhsSawNonKeyCol = false;
+        sawKey = false;
+        sawNonKeyCol = false;
 
-		sawKey = false;
-		sawNonKeyCol = false;
+        LogicalExpression binLHS = binOp.getLhs();
+        LogicalExpression binRHS = binOp.getRhs();
+        // Take care of nested OR as in
+        // ((cond1 and cond2) or (cond3 and cond4) or (cond5 and cond6)) or (cond7 and cond8)
+        if (binOp instanceof OrExpression &&
+                ((binLHS instanceof AndExpression && binRHS instanceof AndExpression) ||
+                  binLHS instanceof OrExpression || binRHS instanceof OrExpression)) {
+            visit(binLHS);
+            lhsSawNonKeyCol = sawNonKeyCol;
+            this.replaceSide = Side.NONE;
+            visit(binRHS);
+            rhsSawNonKeyCol = sawNonKeyCol;
+            this.replaceSide = Side.NONE;
+            if (lhsSawNonKeyCol || rhsSawNonKeyCol || !canPushDown) {
+                sawKey = false;
+                sawNonKeyCol = true;
+                // Don't set canPushDown to false. If there are other AND
+                // conditions on a partition column we want to push that down
+                LOG.warn("No partition filter push down: You have partition and non-partition "
+                        + "columns  in a construction like: "
+                        + "(pcond and non-pcond ..) or (pcond and non-pcond ...) "
+                        + "where pcond is a condition on a partition column and "
+                        + "non-pcond is a condition on a non-partition column.");
+                return;
+            }
+        }
 		visit( binOp.getLhs() );
 		replaceChild(binOp.getLhs());
 		lhsSawKey = sawKey;
@@ -240,8 +255,6 @@ public class PColFilterExtractor extends
 		sawNonKeyCol = lhsSawNonKeyCol || rhsSawNonKeyCol;
 	}
 
-
-
 	/**
 	 * @return the condition on partition columns extracted from filter
 	 */

Modified: pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java?rev=1477347&r1=1477346&r2=1477347&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java Mon Apr 29 21:20:10 2013
@@ -340,6 +340,98 @@ public class TestPartitionFilterPushDown
     }
 
     @Test
+    public void testAndORConditionPartitionKeyCol() throws Exception {
+        // Case of AND and OR
+        String q = query + "b = filter a by (srcid == 10 and dstid == 5) " +
+                "or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7);" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid", "dstid"),
+                "((((srcid == 10) and (dstid == 5)) " +
+                "or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))",
+                null);
+
+        // Additional filter on non-partition key column
+        q = query +
+                "b = filter a by ((srcid == 10 and dstid == 5) " +
+                "or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7)) and mrkt == 'US';" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid", "dstid"),
+                "((((srcid == 10) and (dstid == 5)) " +
+                "or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))",
+                "(mrkt == 'US')");
+
+        // partition key col but null condition which should not become part of
+        // the pushed down filter
+        q = query + "b = filter a by (srcid is null and dstid == 5) " +
+                "or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7);" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid", "dstid"), null);
+
+        // Case of OR and AND
+        q = query +
+                "b = filter a by (mrkt == 'US' or mrkt == 'UK') and (srcid == 11 or srcid == 10);" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid", "mrkt"),
+                "(((mrkt == 'US') or (mrkt == 'UK')) and ((srcid == 11) or (srcid == 10)))", null);
+        q = query +
+                "b = filter a by (mrkt == 'US' or mrkt == 'UK') and (srcid == 11 or srcid == 10) and dstid == 10;" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid", "mrkt"),
+                "(((mrkt == 'US') or (mrkt == 'UK')) and ((srcid == 11) or (srcid == 10)))",
+                "(dstid == 10)");
+    }
+
+    @Test
+    public void testAndORConditionMixedCol() throws Exception {
+        // Case of AND and OR with partition key and non-partition key columns
+        String q = query + "b = filter a by (srcid == 10 and dstid == 5) " +
+                "or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7) " +
+                "or (srcid == 13 and dstid == 8);" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid"), null,
+                "(((((srcid == 10) and (dstid == 5)) " +
+                "or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7))) " +
+                "or ((srcid == 13) and (dstid == 8)))");
+
+        // Additional filter on a partition key column
+        q = query +
+                "b = filter a by ((srcid == 10 and dstid == 5) or (srcid == 11 and dstid == 6) " +
+                "or (srcid == 12 and dstid == 7)) and mrkt == 'US';" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid", "mrkt"), "(mrkt == 'US')",
+                "((((srcid == 10) and (dstid == 5)) or ((srcid == 11) and (dstid == 6))) " +
+                "or ((srcid == 12) and (dstid == 7)))");
+
+        q = query + "b = filter a by (mrkt == 'US' or mrkt == 'UK') and " +
+                "((srcid == 10 and dstid == 5) or (srcid == 11 and dstid == 6) " +
+                "or (srcid == 12 and dstid == 7));" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid", "mrkt"), "((mrkt == 'US') or (mrkt == 'UK'))",
+                "((((srcid == 10) and (dstid == 5)) or ((srcid == 11) and (dstid == 6))) " +
+                "or ((srcid == 12) and (dstid == 7)))");
+
+        // Additional filter on a non-partition key column
+        q = query +
+                "b = filter a by ((srcid == 10 and dstid == 5) or (srcid == 11 and dstid == 6) " +
+                "or (srcid == 12 and dstid == 7)) and mrkt == 'US';" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid"), null,
+                "(((((srcid == 10) and (dstid == 5)) or ((srcid == 11) and (dstid == 6))) " +
+                "or ((srcid == 12) and (dstid == 7))) and (mrkt == 'US'))");
+
+        // Case of OR and AND
+        q = query +
+                "b = filter a by (mrkt == 'US' or mrkt == 'UK') and " +
+                "(srcid == 11 or srcid == 10) and (dstid == 5 or dstid == 6);" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid"),
+                "((srcid == 11) or (srcid == 10))",
+                "(((mrkt == 'US') or (mrkt == 'UK')) and ((dstid == 5) or (dstid == 6)))");
+        test(q, Arrays.asList("mrkt"),
+                "((mrkt == 'US') or (mrkt == 'UK'))",
+                "(((srcid == 11) or (srcid == 10)) and ((dstid == 5) or (dstid == 6)))");
+    }
+
     public void testNegPColConditionWithNonPCol() throws Exception {
         // use of partition column condition and non partition column in
         // same condition should fail
@@ -656,8 +748,8 @@ public class TestPartitionFilterPushDown
                     pColExtractor.getPColCondition());
         } else  {
             Assert.assertEquals("Checking partition column filter:",
-                    expPartFilterString.toLowerCase(),
-                    pColExtractor.getPColCondition().toString().toLowerCase());
+                    expPartFilterString,
+                    pColExtractor.getPColCondition().toString());
         }
 
         // The getExpression() in PColFilterExtractor was written to get the
@@ -674,7 +766,7 @@ public class TestPartitionFilterPushDown
                 String actual = pColExtractor
                         .getExpression(
                                 (LogicalExpression) filter.getFilterPlan().getSources().get(0))
-                        .toString().toLowerCase();
+                        .toString();
                 Assert.assertEquals("checking trimmed filter expression:", expFilterString, actual);
             }
         }