You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/08/06 03:04:47 UTC

svn commit: r1616066 - in /pig/trunk: src/org/apache/pig/ src/org/apache/pig/builtin/ src/org/apache/pig/newplan/ src/org/apache/pig/newplan/logical/rules/ test/org/apache/pig/builtin/ test/org/apache/pig/test/

Author: rohini
Date: Wed Aug  6 01:04:46 2014
New Revision: 1616066

URL: http://svn.apache.org/r1616066
Log:
PIG-4091: Predicate pushdown for ORC - diff patch

Modified:
    pig/trunk/src/org/apache/pig/Main.java
    pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
    pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java
    pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java
    pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java
    pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java
    pig/trunk/test/org/apache/pig/test/Util.java

Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1616066&r1=1616065&r2=1616066&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Wed Aug  6 01:04:46 2014
@@ -871,6 +871,7 @@ public class Main {
             System.out.println("            ColumnMapKeyPrune - Remove unused data");
             System.out.println("            AddForEach - Add ForEach to remove unneeded columns");
             System.out.println("            MergeForEach - Merge adjacent ForEach");
+            System.out.println("            GroupByConstParallelSetter - Force parallel 1 for \"group all\" statement");
             System.out.println("            PartitionFilterOptimizer - Pushdown partition filter conditions to loader implementing LoadMetaData");
             System.out.println("            PredicatePushdownOptimizer - Pushdown filter predicates to loader implementing LoadPredicatePushDown");
             System.out.println("            All - Disable all optimizations");

Modified: pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/OrcStorage.java?rev=1616066&r1=1616065&r2=1616066&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/OrcStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/OrcStorage.java Wed Aug  6 01:04:46 2014
@@ -497,21 +497,20 @@ public class OrcStorage extends LoadFunc
         for (ResourceFieldSchema field : schema.getFields()) {
             switch(field.getType()) {
             case DataType.BOOLEAN:
-                //TODO: Need to find what to set for boolean. Throws error if SearchArgument value is set as boolean
+                // TODO: ORC does not seem to support it
                 break;
             case DataType.INTEGER:
             case DataType.LONG:
             case DataType.FLOAT:
             case DataType.DOUBLE:
             case DataType.DATETIME:
-            case DataType.BYTEARRAY:
             case DataType.CHARARRAY:
             case DataType.BIGINTEGER:
             case DataType.BIGDECIMAL:
                 predicateFields.add(field.getName());
                 break;
             default:
-                // Skip DataType.TUPLE, DataType.MAP and DataType.BAG
+                // Skip DataType.BYTEARRAY, DataType.TUPLE, DataType.MAP and DataType.BAG
                 break;
             }
         }
@@ -680,7 +679,6 @@ public class OrcStorage extends LoadFunc
             //TODO is this right based on what DateTimeWritable.dateToDays() does? What about pig.datetime.default.tz?
             return new DateWritable((int)(((DateTime)value).getMillis() / TimeUnit.DAYS.toMillis(1)));
         } else {
-            //TODO compare to Orc schema and change type for varchar, typecast for byte, short
             return value;
         }
     }

Modified: pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java?rev=1616066&r1=1616065&r2=1616066&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java Wed Aug  6 01:04:46 2014
@@ -49,6 +49,11 @@ import org.apache.pig.newplan.logical.ex
 import org.apache.pig.newplan.logical.expression.SubtractExpression;
 import org.apache.pig.newplan.logical.expression.UnaryExpression;
 
+/**
+ *
+ * Extracts filter predicates for interfaces implementing {@code LoadPredicatePushdown}
+ *
+ */
 public abstract class FilterExtractor {
 
     protected final Log LOG = LogFactory.getLog(getClass());
@@ -150,8 +155,6 @@ public abstract class FilterExtractor {
             state.pushdownExpr = op;
             state.filterExpr = null;
             return state;
-        } else if(op instanceof CastExpression) {
-            return checkPushDown(((CastExpression)op).getExpression());
         } else if (op instanceof UnaryExpression) {
             return checkPushDown((UnaryExpression) op);
         } else {
@@ -269,8 +272,12 @@ public abstract class FilterExtractor {
     }
 
     protected KeyState checkPushDown(UnaryExpression unaryExpr) throws FrontendException {
+
         KeyState state = new KeyState();
         if (isSupportedOpType(unaryExpr)) {
+            if (unaryExpr instanceof CastExpression) {
+                return checkPushDown(unaryExpr.getExpression());
+            }
             if (unaryExpr instanceof IsNullExpression) {
                 state.pushdownExpr = unaryExpr;
                 state.filterExpr = null;

Modified: pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java?rev=1616066&r1=1616065&r2=1616066&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java Wed Aug  6 01:04:46 2014
@@ -28,11 +28,9 @@ import org.apache.pig.newplan.logical.ex
 import org.apache.pig.newplan.logical.expression.UnaryExpression;
 
 /**
- * This is a rewrite of {@code PColFilterExtractor}
  *
- * We traverse the expression plan bottom up and separate it into two plans
- * - pushdownExprPlan, plan that can be pushed down to the loader and
- * - filterExprPlan, remaining plan that needs to be evaluated by pig
+ * This is a rewrite of {@code PColFilterExtractor}
+ * Extracts partition filters for interfaces implementing LoadMetaData
  *
  */
 public class PartitionFilterExtractor extends FilterExtractor {

Modified: pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java?rev=1616066&r1=1616065&r2=1616066&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java Wed Aug  6 01:04:46 2014
@@ -25,6 +25,7 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.newplan.logical.expression.AddExpression;
 import org.apache.pig.newplan.logical.expression.AndExpression;
 import org.apache.pig.newplan.logical.expression.BinaryExpression;
+import org.apache.pig.newplan.logical.expression.CastExpression;
 import org.apache.pig.newplan.logical.expression.DivideExpression;
 import org.apache.pig.newplan.logical.expression.EqualExpression;
 import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression;
@@ -125,7 +126,9 @@ public class PredicatePushDownFilterExtr
 
     @Override
     protected boolean isSupportedOpType(UnaryExpression unaryOp) {
-        if(unaryOp instanceof IsNullExpression) {
+        if (unaryOp instanceof CastExpression) {
+            return true;
+        } else if(unaryOp instanceof IsNullExpression) {
             return supportedOpTypes.contains(OpType.OP_NULL);
         } else if(unaryOp instanceof NotExpression) {
             return supportedOpTypes.contains(OpType.OP_NOT);

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java?rev=1616066&r1=1616065&r2=1616066&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java Wed Aug  6 01:04:46 2014
@@ -92,6 +92,8 @@ public class PredicatePushdownOptimizer 
 
         private OperatorSubPlan subPlan;
 
+        private boolean planChanged;
+
         @Override
         public boolean check(OperatorPlan matched) throws FrontendException {
             loLoad = (LOLoad)matched.getSources().get(0);
@@ -127,10 +129,10 @@ public class PredicatePushdownOptimizer 
 
         @Override
         public OperatorPlan reportChanges() {
-            // Return null in case predicate pushdown is just a hint which means the plan hasn't changed.
+            // Return null in case there is no predicate pushdown filter extracted or it is just
+            // a hint which means the plan hasn't changed.
             // If not return the modified plan which has filters removed.
-            return null;
-            //return subPlan; TODO: implement filter removal
+            return planChanged ? subPlan : null;
         }
 
         @Override
@@ -155,6 +157,18 @@ public class PredicatePushdownOptimizer 
                 } catch (IOException e) {
                     throw new FrontendException( e );
                 }
+
+                //TODO: PIG-4093
+                /*
+                if (loadPredPushdown.removeFilterPredicateFromPlan()) {
+                    planChanged = true;
+                    if(filterFinder.isFilterRemovable()) {
+                        currentPlan.removeAndReconnect( loFilter );
+                    } else {
+                        loFilter.setFilterPlan(filterFinder.getFilteredPlan());
+                    }
+                }
+                */
             }
         }
 

Modified: pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java?rev=1616066&r1=1616065&r2=1616066&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java (original)
+++ pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java Wed Aug  6 01:04:46 2014
@@ -62,13 +62,15 @@ public class TestOrcStoragePushdown {
 
     private static List<OpType> supportedOpTypes;
     private static MiniGenericCluster cluster;
-    private PigServer pigServer;
+    private static PigServer pigServer;
     private String query = "a = load 'foo' as (srcid:int, mrkt:chararray, dstid:int, name:chararray, " +
             "age:int, browser:map[], location:tuple(country:chararray, zip:int));";
     private OrcStorage orcStorage;
 
     private static final String basedir = "test/org/apache/pig/builtin/orc/";
-    private static final String outbasedir = System.getProperty("user.dir") + "/build/test/TestOrcStorage/";
+    private static final String inpbasedir = System.getProperty("user.dir") + "/build/test/TestOrcStorage_in/";
+    private static final String outbasedir = System.getProperty("user.dir") + "/build/test/TestOrcStorage_out/";
+    private static String INPUT = inpbasedir + "TestOrcStorage_1";
     private static String OUTPUT1 = outbasedir + "TestOrcStorage_1";
     private static String OUTPUT2 = outbasedir + "TestOrcStorage_2";
     private static String OUTPUT3 = outbasedir + "TestOrcStorage_3";
@@ -77,9 +79,10 @@ public class TestOrcStoragePushdown {
     private static File logFile;
 
     @BeforeClass
-    public static void oneTimeSetup() throws IOException{
+    public static void oneTimeSetup() throws Exception{
         cluster = MiniGenericCluster.buildCluster();
         Util.copyFromLocalToCluster(cluster, basedir + "orc-file-11-format.orc", basedir + "orc-file-11-format.orc");
+        createInputData();
 
         if(Util.WINDOWS){
             OUTPUT1 = OUTPUT1.replace("\\", "/");
@@ -111,9 +114,45 @@ public class TestOrcStoragePushdown {
         logger.addAppender(appender);
     }
 
+    private static void createInputData() throws Exception {
+        pigServer = new PigServer(ExecType.LOCAL);
+
+        new File(inpbasedir).mkdirs();
+        new File(outbasedir).mkdirs();
+        String inputTxtFile = inpbasedir + File.separator + "input.txt";
+        BufferedWriter bw = new BufferedWriter(new FileWriter(inputTxtFile));
+        long[] lVal = new long[] {100L, 200L, 300L};
+        float[] fVal = new float[] {50.0f, 100.0f, 200.0f, 300.0f};
+        double[] dVal = new double[] {1000.11, 2000.22, 3000.33};
+        StringBuilder sb = new StringBuilder();
+        for (int i=1; i <= 10000; i++) {
+            sb.append((i > 900 && i < 1100) ? true : false).append("\t"); //boolean
+            sb.append((i > 1000 && i < 3000) ? 1 : 5).append("\t"); //byte
+            sb.append((i > 2500 && i <= 4500) ? 100 : 200).append("\t"); //short
+            sb.append(i).append("\t"); //int
+            sb.append(lVal[i%3]).append("\t"); //long
+            sb.append(fVal[i%4]).append("\t"); //float
+            sb.append((i > 2500 && i < 3500) ? dVal[i%3] : dVal[i%1]).append("\t"); //double
+            sb.append((i%2 == 1 ? "" : RandomStringUtils.random(100))).append("\t"); //bytearray
+            sb.append((i%2 == 0 ? "" : RandomStringUtils.random(100))).append("\n"); //string
+            //sb.append("").append("\t"); //datetime
+            //sb.append("").append("\n"); //bigdecimal
+            bw.write(sb.toString());
+            sb.setLength(0);
+        }
+        bw.close();
+
+        // Store only 1000 rows in each row block (MIN_ROW_INDEX_STRIDE is 1000. So can't use less than that)
+        pigServer.registerQuery("A = load '" + inputTxtFile + "' as (f1:boolean, f2:int, f3:int, f4:int, f5:long, f6:float, f7:double, f8:bytearray, f9:chararray);");//, f10:datetime, f11:bigdecimal);");
+        pigServer.registerQuery("store A into '" + INPUT +"' using OrcStorage('-r 1000');");
+        Util.copyFromLocalToCluster(cluster, INPUT, INPUT);
+    }
+
     @AfterClass
     public static void tearDownAfterClass() throws Exception {
+        Util.deleteDirectory(new File(inpbasedir));
         if (cluster != null) {
+            Util.deleteFile(cluster, inpbasedir);
             cluster.shutDown();
         }
     }
@@ -137,6 +176,26 @@ public class TestOrcStoragePushdown {
     }
 
     @Test
+    public void testColumnPruning() throws Exception {
+        Util.resetStateForExecModeSwitch();
+        pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+
+        pigServer.registerQuery("A = load '" + basedir + "orc-file-11-format.orc' using OrcStorage();");
+        ExecJob job = pigServer.store("A", OUTPUT1);
+        JobStats stats = (JobStats) job.getStatistics().getJobGraph().getSources().get(0);
+        long bytesWithoutPushdown = stats.getHdfsBytesRead();
+
+        pigServer.registerQuery("PRUNE = load '" + basedir + "orc-file-11-format.orc' using OrcStorage();");
+        pigServer.registerQuery("PRUNE = foreach PRUNE generate boolean1;");
+        job = pigServer.store("PRUNE", OUTPUT2);
+        Util.checkLogFileMessage(logFile, new String[]{"Columns pruned for PRUNE: $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13"}, true);
+        stats = (JobStats) job.getStatistics().getJobGraph().getSources().get(0);
+        long bytesWithPushdown = stats.getHdfsBytesRead();
+
+        assertTrue((bytesWithoutPushdown - bytesWithPushdown) > 300000);
+    }
+
+    @Test
     public void testSimple() throws Exception {
         String q = query + "b = filter a by srcid == 10;" + "store b into 'out';";
         Expression expr = getExpressionForTest(q, Arrays.asList("srcid"));
@@ -184,7 +243,6 @@ public class TestOrcStoragePushdown {
         String q = query + "b = filter a by srcid == 10 or srcid == 11;" + "store b into 'out';";
         Expression expr = getExpressionForTest(q, Arrays.asList("srcid"));
         SearchArgument sarg = orcStorage.getSearchArgument(expr);
-        System.out.println(sarg);
         assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
                 "leaf-1 = (EQUALS srcid 11)\n" +
                 "expr = (or leaf-0 leaf-1)", sarg.toString());
@@ -232,79 +290,106 @@ public class TestOrcStoragePushdown {
                 "expr = leaf-0", sarg.toString());
     }
 
-    // Minicluster tests which verify stats
+    //@Test
+    public void testPredicatePushdownBoolean() throws Exception {
+        testPredicatePushdownLocal("f1 == true", 10);
+    }
 
     @Test
-    public void testColumnPruneBytesRead() throws Exception {
-        Util.resetStateForExecModeSwitch();
-        pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+    public void testPredicatePushdownByteShort() throws Exception {
+        //TODO: BytesWithoutPushdown was 2373190 and bytesWithPushdown was 1929669
+        // Expected to see more difference only when 3 out of 10 blocks are read. Other tests too.
+        // Investigate why.
+        testPredicatePushdown("f2 != 5 or f3 == 100", 3500, 400000);
+    }
 
-        pigServer.registerQuery("A = load '" + basedir + "orc-file-11-format.orc' using OrcStorage();");
-        ExecJob job = pigServer.store("A", OUTPUT1);
-        JobStats stats = (JobStats) job.getStatistics().getJobGraph().getSources().get(0);
-        long bytesWithoutPushdown = stats.getHdfsBytesRead();
+    @Test
+    public void testPredicatePushdownIntLongString() throws Exception {
+        testPredicatePushdown("f4 >= 980 and f4 < 1010 and (f5 == 100 or f9 is not null)", 20, 800000);
+    }
 
-        pigServer.registerQuery("PRUNE = load '" + basedir + "orc-file-11-format.orc' using OrcStorage();");
-        pigServer.registerQuery("PRUNE = foreach PRUNE generate boolean1;");
-        job = pigServer.store("PRUNE", OUTPUT2);
-        Util.checkLogFileMessage(logFile, new String[]{"Columns pruned for PRUNE: $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13"}, true);
-        stats = (JobStats) job.getStatistics().getJobGraph().getSources().get(0);
-        long bytesWithPushdown = stats.getHdfsBytesRead();
+    @Test
+    public void testPredicatePushdownFloatDouble() throws Exception {
+        testPredicatePushdown("f6 == 100.0 and f7 > 2000.00000001", 167, 800000);
+    }
 
-        assertTrue((bytesWithoutPushdown - bytesWithPushdown) > 300000);
+    //@Test
+    public void testPredicatePushdownBigDecimal() throws Exception {
     }
 
-    @Test
-    public void testPredicatePushdownBytesRead() throws Exception {
-        new File(outbasedir).mkdirs();
-        BufferedWriter bw = new BufferedWriter(new FileWriter(OUTPUT1));
-        long[] f2 = new long[] {100L, 200L, 300L};
-        for (int i=1; i <= 10000; i++) {
-            bw.write(i + "\t" + f2[i%3] + "\t" + (i%2 == 0 ? "" : RandomStringUtils.random(100))+ "\n");
-        }
-        bw.close();
+    //@Test
+    public void testPredicatePushdownTimestamp() throws Exception {
+    }
 
-        // Store only 1000 rows in each row block (MIN_ROW_INDEX_STRIDE is 1000. So can't use less than that)
-        pigServer.registerQuery("A = load '" + OUTPUT1 + "' as (f1:int, f2:long, f3:chararray);");
-        pigServer.registerQuery("store A into '" + OUTPUT2 +"' using OrcStorage('-r 1000');");
-        Util.copyFromLocalToCluster(cluster, OUTPUT2, OUTPUT2);
+    private Expression getExpressionForTest(String query, List<String> predicateCols) throws Exception {
+        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
+        Operator op = newLogicalPlan.getSinks().get(0);
+        LOFilter filter = (LOFilter) newLogicalPlan.getPredecessors(op).get(0);
+        PredicatePushDownFilterExtractor filterExtractor = new PredicatePushDownFilterExtractor(filter.getFilterPlan(), predicateCols, supportedOpTypes);
+        filterExtractor.visit();
+        return filterExtractor.getPushDownExpression();
+    }
+
+    // For eclipse debugging
+    private void testPredicatePushdownLocal(String filterStmt, int expectedRows) throws IOException {
+
+        PigServer pigServer_disabledRule = new PigServer(ExecType.LOCAL);
+        // Test with PredicatePushdownOptimizer disabled.
+        HashSet<String> disabledOptimizerRules = new HashSet<String>();
+        disabledOptimizerRules.add("PredicatePushdownOptimizer");
+        pigServer_disabledRule.getPigContext().getProperties().setProperty(PigImplConstants.PIG_OPTIMIZER_RULES_KEY,
+                ObjectSerializer.serialize(disabledOptimizerRules));
+        pigServer_disabledRule.registerQuery("B = load '" + INPUT + "' using OrcStorage();");
+        pigServer_disabledRule.registerQuery("C = filter B by " + filterStmt + ";");
+
+        // Test with PredicatePushdownOptimizer enabled.
+        pigServer.registerQuery("D = load '" + INPUT + "' using OrcStorage();");
+        pigServer.registerQuery("E = filter D by " + filterStmt + ";");
+
+        //Verify that results are same
+        Util.checkQueryOutputs(pigServer_disabledRule.openIterator("C"), pigServer.openIterator("E"), expectedRows);
+    }
+
+    private void testPredicatePushdown(String filterStmt, int expectedRows, int expectedBytesReadDiff) throws IOException {
 
         Util.resetStateForExecModeSwitch();
+        // Minicluster is required to get hdfs bytes read counter value
         pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+        PigServer pigServer_disabledRule = new PigServer(cluster.getExecType(), cluster.getProperties());
 
         // Test with PredicatePushdownOptimizer disabled. All 3 blocks will be read
         HashSet<String> disabledOptimizerRules = new HashSet<String>();
         disabledOptimizerRules.add("PredicatePushdownOptimizer");
-        pigServer.getPigContext().getProperties().setProperty(PigImplConstants.PIG_OPTIMIZER_RULES_KEY,
+        pigServer_disabledRule.getPigContext().getProperties().setProperty(PigImplConstants.PIG_OPTIMIZER_RULES_KEY,
                 ObjectSerializer.serialize(disabledOptimizerRules));
-        pigServer.registerQuery("B = load '" + OUTPUT2 + "' using OrcStorage();");
-        pigServer.registerQuery("C = filter B by f1 > 980 and f1 < 1010 and (f2 == 100 or f3 is not null);");
-        ExecJob job = pigServer.store("C", OUTPUT3);
+        pigServer_disabledRule.registerQuery("B = load '" + INPUT + "' using OrcStorage();");
+        pigServer_disabledRule.registerQuery("C = filter B by " + filterStmt + ";");
+        ExecJob job = pigServer_disabledRule.store("C", OUTPUT3);
+        //Util.copyFromClusterToLocal(cluster, OUTPUT3 + "/part-m-00000", OUTPUT3);
         JobStats stats = (JobStats) job.getStatistics().getJobGraph().getSources().get(0);
-        assertEquals(20, stats.getRecordWrittern());
+        assertEquals(expectedRows, stats.getRecordWrittern());
         long bytesWithoutPushdown = stats.getHdfsBytesRead();
 
         // Test with PredicatePushdownOptimizer enabled. Only 2 blocks should be read
-        pigServer.getPigContext().getProperties().remove(PigImplConstants.PIG_OPTIMIZER_RULES_KEY);
-        pigServer.registerQuery("D = load '" + OUTPUT2 + "' using OrcStorage();");
-        pigServer.registerQuery("E = filter D by f1 > 980 and f1 < 1010 and (f2 == 100 or f3 is not null);");
+        pigServer.registerQuery("D = load '" + INPUT + "' using OrcStorage();");
+        pigServer.registerQuery("E = filter D by " + filterStmt + ";");
         job = pigServer.store("E", OUTPUT4);
+        //Util.copyFromClusterToLocal(cluster, OUTPUT4 + "/part-m-00000", OUTPUT4);
         stats = (JobStats) job.getStatistics().getJobGraph().getSources().get(0);
-        assertEquals(20, stats.getRecordWrittern());
+        assertEquals(expectedRows, stats.getRecordWrittern());
         long bytesWithPushdown = stats.getHdfsBytesRead();
 
-        assertTrue((bytesWithoutPushdown - bytesWithPushdown) > 300000);
-        //Verify that results are same
-        Util.checkQueryOutputs(pigServer.openIterator("C"), pigServer.openIterator("E"));
-    }
+        System.out.println("bytesWithoutPushdown was " + bytesWithoutPushdown +
+                " and bytesWithPushdown was " + bytesWithPushdown);
+        assertTrue("BytesWithoutPushdown was " + bytesWithoutPushdown +
+                " and bytesWithPushdown was " + bytesWithPushdown,
+                (bytesWithoutPushdown - bytesWithPushdown) > expectedBytesReadDiff);
+        // Verify that results are same
+        Util.checkQueryOutputs(pigServer_disabledRule.openIterator("C"), pigServer.openIterator("E"), expectedRows);
+        pigServer_disabledRule.shutdown();
 
-    private Expression getExpressionForTest(String query, List<String> predicateCols) throws Exception {
-        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-        Operator op = newLogicalPlan.getSinks().get(0);
-        LOFilter filter = (LOFilter) newLogicalPlan.getPredecessors(op).get(0);
-        PredicatePushDownFilterExtractor filterExtractor = new PredicatePushDownFilterExtractor(filter.getFilterPlan(), predicateCols, supportedOpTypes);
-        filterExtractor.visit();
-        return filterExtractor.getPushDownExpression();
     }
 
+
+
 }

Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1616066&r1=1616065&r2=1616066&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Wed Aug  6 01:04:46 2014
@@ -511,15 +511,19 @@ public class Util {
       * @param expectedResults Expected results List to validate against
       */
       static public void checkQueryOutputs(Iterator<Tuple> actualResults,
-            Iterator<Tuple> expectedResults) {
+            Iterator<Tuple> expectedResults, Integer expectedRows) {
+          int count = 0;
           while (expectedResults.hasNext()) {
               Tuple expected = expectedResults.next();
               Assert.assertTrue("Actual result has less records than expected results", actualResults.hasNext());
               Tuple actual = actualResults.next();
-              System.out.println("Rohini: expected : "  + expected.toString());
               Assert.assertEquals(expected.toString(), actual.toString());
+              count++;
           }
           Assert.assertFalse("Actual result has more records than expected results", actualResults.hasNext());
+          if (expectedRows != null) {
+              Assert.assertEquals((int)expectedRows, count);
+          }
       }
 
     /**