You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/08/06 03:04:47 UTC
svn commit: r1616066 - in /pig/trunk: src/org/apache/pig/
src/org/apache/pig/builtin/ src/org/apache/pig/newplan/
src/org/apache/pig/newplan/logical/rules/ test/org/apache/pig/builtin/
test/org/apache/pig/test/
Author: rohini
Date: Wed Aug 6 01:04:46 2014
New Revision: 1616066
URL: http://svn.apache.org/r1616066
Log:
PIG-4091: Predicate pushdown for ORC - diff patch
Modified:
pig/trunk/src/org/apache/pig/Main.java
pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java
pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java
pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java
pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java
pig/trunk/test/org/apache/pig/test/Util.java
Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1616066&r1=1616065&r2=1616066&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Wed Aug 6 01:04:46 2014
@@ -871,6 +871,7 @@ public class Main {
System.out.println(" ColumnMapKeyPrune - Remove unused data");
System.out.println(" AddForEach - Add ForEach to remove unneeded columns");
System.out.println(" MergeForEach - Merge adjacent ForEach");
+ System.out.println(" GroupByConstParallelSetter - Force parallel 1 for \"group all\" statement");
System.out.println(" PartitionFilterOptimizer - Pushdown partition filter conditions to loader implementing LoadMetaData");
System.out.println(" PredicatePushdownOptimizer - Pushdown filter predicates to loader implementing LoadPredicatePushDown");
System.out.println(" All - Disable all optimizations");
Modified: pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/OrcStorage.java?rev=1616066&r1=1616065&r2=1616066&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/OrcStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/OrcStorage.java Wed Aug 6 01:04:46 2014
@@ -497,21 +497,20 @@ public class OrcStorage extends LoadFunc
for (ResourceFieldSchema field : schema.getFields()) {
switch(field.getType()) {
case DataType.BOOLEAN:
- //TODO: Need to find what to set for boolean. Throws error if SearchArgument value is set as boolean
+ // TODO: ORC does not seem to support it
break;
case DataType.INTEGER:
case DataType.LONG:
case DataType.FLOAT:
case DataType.DOUBLE:
case DataType.DATETIME:
- case DataType.BYTEARRAY:
case DataType.CHARARRAY:
case DataType.BIGINTEGER:
case DataType.BIGDECIMAL:
predicateFields.add(field.getName());
break;
default:
- // Skip DataType.TUPLE, DataType.MAP and DataType.BAG
+ // Skip DataType.BYTEARRAY, DataType.TUPLE, DataType.MAP and DataType.BAG
break;
}
}
@@ -680,7 +679,6 @@ public class OrcStorage extends LoadFunc
//TODO is this right based on what DateTimeWritable.dateToDays() does? What about pig.datetime.default.tz?
return new DateWritable((int)(((DateTime)value).getMillis() / TimeUnit.DAYS.toMillis(1)));
} else {
- //TODO compare to Orc schema and change type for varchar, typecast for byte, short
return value;
}
}
Modified: pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java?rev=1616066&r1=1616065&r2=1616066&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java Wed Aug 6 01:04:46 2014
@@ -49,6 +49,11 @@ import org.apache.pig.newplan.logical.ex
import org.apache.pig.newplan.logical.expression.SubtractExpression;
import org.apache.pig.newplan.logical.expression.UnaryExpression;
+/**
+ *
+ * Extracts filter predicates for interfaces implementing {@code LoadPredicatePushdown}
+ *
+ */
public abstract class FilterExtractor {
protected final Log LOG = LogFactory.getLog(getClass());
@@ -150,8 +155,6 @@ public abstract class FilterExtractor {
state.pushdownExpr = op;
state.filterExpr = null;
return state;
- } else if(op instanceof CastExpression) {
- return checkPushDown(((CastExpression)op).getExpression());
} else if (op instanceof UnaryExpression) {
return checkPushDown((UnaryExpression) op);
} else {
@@ -269,8 +272,12 @@ public abstract class FilterExtractor {
}
protected KeyState checkPushDown(UnaryExpression unaryExpr) throws FrontendException {
+
KeyState state = new KeyState();
if (isSupportedOpType(unaryExpr)) {
+ if (unaryExpr instanceof CastExpression) {
+ return checkPushDown(unaryExpr.getExpression());
+ }
if (unaryExpr instanceof IsNullExpression) {
state.pushdownExpr = unaryExpr;
state.filterExpr = null;
Modified: pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java?rev=1616066&r1=1616065&r2=1616066&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java Wed Aug 6 01:04:46 2014
@@ -28,11 +28,9 @@ import org.apache.pig.newplan.logical.ex
import org.apache.pig.newplan.logical.expression.UnaryExpression;
/**
- * This is a rewrite of {@code PColFilterExtractor}
*
- * We traverse the expression plan bottom up and separate it into two plans
- * - pushdownExprPlan, plan that can be pushed down to the loader and
- * - filterExprPlan, remaining plan that needs to be evaluated by pig
+ * This is a rewrite of {@code PColFilterExtractor}
+ * Extracts partition filters for interfaces implementing LoadMetaData
*
*/
public class PartitionFilterExtractor extends FilterExtractor {
Modified: pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java?rev=1616066&r1=1616065&r2=1616066&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java Wed Aug 6 01:04:46 2014
@@ -25,6 +25,7 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.newplan.logical.expression.AddExpression;
import org.apache.pig.newplan.logical.expression.AndExpression;
import org.apache.pig.newplan.logical.expression.BinaryExpression;
+import org.apache.pig.newplan.logical.expression.CastExpression;
import org.apache.pig.newplan.logical.expression.DivideExpression;
import org.apache.pig.newplan.logical.expression.EqualExpression;
import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression;
@@ -125,7 +126,9 @@ public class PredicatePushDownFilterExtr
@Override
protected boolean isSupportedOpType(UnaryExpression unaryOp) {
- if(unaryOp instanceof IsNullExpression) {
+ if (unaryOp instanceof CastExpression) {
+ return true;
+ } else if(unaryOp instanceof IsNullExpression) {
return supportedOpTypes.contains(OpType.OP_NULL);
} else if(unaryOp instanceof NotExpression) {
return supportedOpTypes.contains(OpType.OP_NOT);
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java?rev=1616066&r1=1616065&r2=1616066&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java Wed Aug 6 01:04:46 2014
@@ -92,6 +92,8 @@ public class PredicatePushdownOptimizer
private OperatorSubPlan subPlan;
+ private boolean planChanged;
+
@Override
public boolean check(OperatorPlan matched) throws FrontendException {
loLoad = (LOLoad)matched.getSources().get(0);
@@ -127,10 +129,10 @@ public class PredicatePushdownOptimizer
@Override
public OperatorPlan reportChanges() {
- // Return null in case predicate pushdown is just a hint which means the plan hasn't changed.
+ // Return null in case there is no predicate pushdown filter extracted or it is just
+ // a hint which means the plan hasn't changed.
// If not return the modified plan which has filters removed.
- return null;
- //return subPlan; TODO: implement filter removal
+ return planChanged ? subPlan : null;
}
@Override
@@ -155,6 +157,18 @@ public class PredicatePushdownOptimizer
} catch (IOException e) {
throw new FrontendException( e );
}
+
+ //TODO: PIG-4093
+ /*
+ if (loadPredPushdown.removeFilterPredicateFromPlan()) {
+ planChanged = true;
+ if(filterFinder.isFilterRemovable()) {
+ currentPlan.removeAndReconnect( loFilter );
+ } else {
+ loFilter.setFilterPlan(filterFinder.getFilteredPlan());
+ }
+ }
+ */
}
}
Modified: pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java?rev=1616066&r1=1616065&r2=1616066&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java (original)
+++ pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java Wed Aug 6 01:04:46 2014
@@ -62,13 +62,15 @@ public class TestOrcStoragePushdown {
private static List<OpType> supportedOpTypes;
private static MiniGenericCluster cluster;
- private PigServer pigServer;
+ private static PigServer pigServer;
private String query = "a = load 'foo' as (srcid:int, mrkt:chararray, dstid:int, name:chararray, " +
"age:int, browser:map[], location:tuple(country:chararray, zip:int));";
private OrcStorage orcStorage;
private static final String basedir = "test/org/apache/pig/builtin/orc/";
- private static final String outbasedir = System.getProperty("user.dir") + "/build/test/TestOrcStorage/";
+ private static final String inpbasedir = System.getProperty("user.dir") + "/build/test/TestOrcStorage_in/";
+ private static final String outbasedir = System.getProperty("user.dir") + "/build/test/TestOrcStorage_out/";
+ private static String INPUT = inpbasedir + "TestOrcStorage_1";
private static String OUTPUT1 = outbasedir + "TestOrcStorage_1";
private static String OUTPUT2 = outbasedir + "TestOrcStorage_2";
private static String OUTPUT3 = outbasedir + "TestOrcStorage_3";
@@ -77,9 +79,10 @@ public class TestOrcStoragePushdown {
private static File logFile;
@BeforeClass
- public static void oneTimeSetup() throws IOException{
+ public static void oneTimeSetup() throws Exception{
cluster = MiniGenericCluster.buildCluster();
Util.copyFromLocalToCluster(cluster, basedir + "orc-file-11-format.orc", basedir + "orc-file-11-format.orc");
+ createInputData();
if(Util.WINDOWS){
OUTPUT1 = OUTPUT1.replace("\\", "/");
@@ -111,9 +114,45 @@ public class TestOrcStoragePushdown {
logger.addAppender(appender);
}
+ private static void createInputData() throws Exception {
+ pigServer = new PigServer(ExecType.LOCAL);
+
+ new File(inpbasedir).mkdirs();
+ new File(outbasedir).mkdirs();
+ String inputTxtFile = inpbasedir + File.separator + "input.txt";
+ BufferedWriter bw = new BufferedWriter(new FileWriter(inputTxtFile));
+ long[] lVal = new long[] {100L, 200L, 300L};
+ float[] fVal = new float[] {50.0f, 100.0f, 200.0f, 300.0f};
+ double[] dVal = new double[] {1000.11, 2000.22, 3000.33};
+ StringBuilder sb = new StringBuilder();
+ for (int i=1; i <= 10000; i++) {
+ sb.append((i > 900 && i < 1100) ? true : false).append("\t"); //boolean
+ sb.append((i > 1000 && i < 3000) ? 1 : 5).append("\t"); //byte
+ sb.append((i > 2500 && i <= 4500) ? 100 : 200).append("\t"); //short
+ sb.append(i).append("\t"); //int
+ sb.append(lVal[i%3]).append("\t"); //long
+ sb.append(fVal[i%4]).append("\t"); //float
+ sb.append((i > 2500 && i < 3500) ? dVal[i%3] : dVal[i%1]).append("\t"); //double
+ sb.append((i%2 == 1 ? "" : RandomStringUtils.random(100))).append("\t"); //bytearray
+ sb.append((i%2 == 0 ? "" : RandomStringUtils.random(100))).append("\n"); //string
+ //sb.append("").append("\t"); //datetime
+ //sb.append("").append("\n"); //bigdecimal
+ bw.write(sb.toString());
+ sb.setLength(0);
+ }
+ bw.close();
+
+ // Store only 1000 rows in each row block (MIN_ROW_INDEX_STRIDE is 1000. So can't use less than that)
+ pigServer.registerQuery("A = load '" + inputTxtFile + "' as (f1:boolean, f2:int, f3:int, f4:int, f5:long, f6:float, f7:double, f8:bytearray, f9:chararray);");//, f10:datetime, f11:bigdecimal);");
+ pigServer.registerQuery("store A into '" + INPUT +"' using OrcStorage('-r 1000');");
+ Util.copyFromLocalToCluster(cluster, INPUT, INPUT);
+ }
+
@AfterClass
public static void tearDownAfterClass() throws Exception {
+ Util.deleteDirectory(new File(inpbasedir));
if (cluster != null) {
+ Util.deleteFile(cluster, inpbasedir);
cluster.shutDown();
}
}
@@ -137,6 +176,26 @@ public class TestOrcStoragePushdown {
}
@Test
+ public void testColumnPruning() throws Exception {
+ Util.resetStateForExecModeSwitch();
+ pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+
+ pigServer.registerQuery("A = load '" + basedir + "orc-file-11-format.orc' using OrcStorage();");
+ ExecJob job = pigServer.store("A", OUTPUT1);
+ JobStats stats = (JobStats) job.getStatistics().getJobGraph().getSources().get(0);
+ long bytesWithoutPushdown = stats.getHdfsBytesRead();
+
+ pigServer.registerQuery("PRUNE = load '" + basedir + "orc-file-11-format.orc' using OrcStorage();");
+ pigServer.registerQuery("PRUNE = foreach PRUNE generate boolean1;");
+ job = pigServer.store("PRUNE", OUTPUT2);
+ Util.checkLogFileMessage(logFile, new String[]{"Columns pruned for PRUNE: $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13"}, true);
+ stats = (JobStats) job.getStatistics().getJobGraph().getSources().get(0);
+ long bytesWithPushdown = stats.getHdfsBytesRead();
+
+ assertTrue((bytesWithoutPushdown - bytesWithPushdown) > 300000);
+ }
+
+ @Test
public void testSimple() throws Exception {
String q = query + "b = filter a by srcid == 10;" + "store b into 'out';";
Expression expr = getExpressionForTest(q, Arrays.asList("srcid"));
@@ -184,7 +243,6 @@ public class TestOrcStoragePushdown {
String q = query + "b = filter a by srcid == 10 or srcid == 11;" + "store b into 'out';";
Expression expr = getExpressionForTest(q, Arrays.asList("srcid"));
SearchArgument sarg = orcStorage.getSearchArgument(expr);
- System.out.println(sarg);
assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
"leaf-1 = (EQUALS srcid 11)\n" +
"expr = (or leaf-0 leaf-1)", sarg.toString());
@@ -232,79 +290,106 @@ public class TestOrcStoragePushdown {
"expr = leaf-0", sarg.toString());
}
- // Minicluster tests which verify stats
+ //@Test
+ public void testPredicatePushdownBoolean() throws Exception {
+ testPredicatePushdownLocal("f1 == true", 10);
+ }
@Test
- public void testColumnPruneBytesRead() throws Exception {
- Util.resetStateForExecModeSwitch();
- pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+ public void testPredicatePushdownByteShort() throws Exception {
+ //TODO: BytesWithoutPushdown was 2373190 and bytesWithPushdown was 1929669
+ // Expected to see more difference only when 3 out of 10 blocks are read. Other tests too.
+ // Investigate why.
+ testPredicatePushdown("f2 != 5 or f3 == 100", 3500, 400000);
+ }
- pigServer.registerQuery("A = load '" + basedir + "orc-file-11-format.orc' using OrcStorage();");
- ExecJob job = pigServer.store("A", OUTPUT1);
- JobStats stats = (JobStats) job.getStatistics().getJobGraph().getSources().get(0);
- long bytesWithoutPushdown = stats.getHdfsBytesRead();
+ @Test
+ public void testPredicatePushdownIntLongString() throws Exception {
+ testPredicatePushdown("f4 >= 980 and f4 < 1010 and (f5 == 100 or f9 is not null)", 20, 800000);
+ }
- pigServer.registerQuery("PRUNE = load '" + basedir + "orc-file-11-format.orc' using OrcStorage();");
- pigServer.registerQuery("PRUNE = foreach PRUNE generate boolean1;");
- job = pigServer.store("PRUNE", OUTPUT2);
- Util.checkLogFileMessage(logFile, new String[]{"Columns pruned for PRUNE: $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13"}, true);
- stats = (JobStats) job.getStatistics().getJobGraph().getSources().get(0);
- long bytesWithPushdown = stats.getHdfsBytesRead();
+ @Test
+ public void testPredicatePushdownFloatDouble() throws Exception {
+ testPredicatePushdown("f6 == 100.0 and f7 > 2000.00000001", 167, 800000);
+ }
- assertTrue((bytesWithoutPushdown - bytesWithPushdown) > 300000);
+ //@Test
+ public void testPredicatePushdownBigDecimal() throws Exception {
}
- @Test
- public void testPredicatePushdownBytesRead() throws Exception {
- new File(outbasedir).mkdirs();
- BufferedWriter bw = new BufferedWriter(new FileWriter(OUTPUT1));
- long[] f2 = new long[] {100L, 200L, 300L};
- for (int i=1; i <= 10000; i++) {
- bw.write(i + "\t" + f2[i%3] + "\t" + (i%2 == 0 ? "" : RandomStringUtils.random(100))+ "\n");
- }
- bw.close();
+ //@Test
+ public void testPredicatePushdownTimestamp() throws Exception {
+ }
- // Store only 1000 rows in each row block (MIN_ROW_INDEX_STRIDE is 1000. So can't use less than that)
- pigServer.registerQuery("A = load '" + OUTPUT1 + "' as (f1:int, f2:long, f3:chararray);");
- pigServer.registerQuery("store A into '" + OUTPUT2 +"' using OrcStorage('-r 1000');");
- Util.copyFromLocalToCluster(cluster, OUTPUT2, OUTPUT2);
+ private Expression getExpressionForTest(String query, List<String> predicateCols) throws Exception {
+ LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
+ Operator op = newLogicalPlan.getSinks().get(0);
+ LOFilter filter = (LOFilter) newLogicalPlan.getPredecessors(op).get(0);
+ PredicatePushDownFilterExtractor filterExtractor = new PredicatePushDownFilterExtractor(filter.getFilterPlan(), predicateCols, supportedOpTypes);
+ filterExtractor.visit();
+ return filterExtractor.getPushDownExpression();
+ }
+
+ // For eclipse debugging
+ private void testPredicatePushdownLocal(String filterStmt, int expectedRows) throws IOException {
+
+ PigServer pigServer_disabledRule = new PigServer(ExecType.LOCAL);
+ // Test with PredicatePushdownOptimizer disabled.
+ HashSet<String> disabledOptimizerRules = new HashSet<String>();
+ disabledOptimizerRules.add("PredicatePushdownOptimizer");
+ pigServer_disabledRule.getPigContext().getProperties().setProperty(PigImplConstants.PIG_OPTIMIZER_RULES_KEY,
+ ObjectSerializer.serialize(disabledOptimizerRules));
+ pigServer_disabledRule.registerQuery("B = load '" + INPUT + "' using OrcStorage();");
+ pigServer_disabledRule.registerQuery("C = filter B by " + filterStmt + ";");
+
+ // Test with PredicatePushdownOptimizer enabled.
+ pigServer.registerQuery("D = load '" + INPUT + "' using OrcStorage();");
+ pigServer.registerQuery("E = filter D by " + filterStmt + ";");
+
+ //Verify that results are same
+ Util.checkQueryOutputs(pigServer_disabledRule.openIterator("C"), pigServer.openIterator("E"), expectedRows);
+ }
+
+ private void testPredicatePushdown(String filterStmt, int expectedRows, int expectedBytesReadDiff) throws IOException {
Util.resetStateForExecModeSwitch();
+ // Minicluster is required to get hdfs bytes read counter value
pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+ PigServer pigServer_disabledRule = new PigServer(cluster.getExecType(), cluster.getProperties());
// Test with PredicatePushdownOptimizer disabled. All 3 blocks will be read
HashSet<String> disabledOptimizerRules = new HashSet<String>();
disabledOptimizerRules.add("PredicatePushdownOptimizer");
- pigServer.getPigContext().getProperties().setProperty(PigImplConstants.PIG_OPTIMIZER_RULES_KEY,
+ pigServer_disabledRule.getPigContext().getProperties().setProperty(PigImplConstants.PIG_OPTIMIZER_RULES_KEY,
ObjectSerializer.serialize(disabledOptimizerRules));
- pigServer.registerQuery("B = load '" + OUTPUT2 + "' using OrcStorage();");
- pigServer.registerQuery("C = filter B by f1 > 980 and f1 < 1010 and (f2 == 100 or f3 is not null);");
- ExecJob job = pigServer.store("C", OUTPUT3);
+ pigServer_disabledRule.registerQuery("B = load '" + INPUT + "' using OrcStorage();");
+ pigServer_disabledRule.registerQuery("C = filter B by " + filterStmt + ";");
+ ExecJob job = pigServer_disabledRule.store("C", OUTPUT3);
+ //Util.copyFromClusterToLocal(cluster, OUTPUT3 + "/part-m-00000", OUTPUT3);
JobStats stats = (JobStats) job.getStatistics().getJobGraph().getSources().get(0);
- assertEquals(20, stats.getRecordWrittern());
+ assertEquals(expectedRows, stats.getRecordWrittern());
long bytesWithoutPushdown = stats.getHdfsBytesRead();
// Test with PredicatePushdownOptimizer enabled. Only 2 blocks should be read
- pigServer.getPigContext().getProperties().remove(PigImplConstants.PIG_OPTIMIZER_RULES_KEY);
- pigServer.registerQuery("D = load '" + OUTPUT2 + "' using OrcStorage();");
- pigServer.registerQuery("E = filter D by f1 > 980 and f1 < 1010 and (f2 == 100 or f3 is not null);");
+ pigServer.registerQuery("D = load '" + INPUT + "' using OrcStorage();");
+ pigServer.registerQuery("E = filter D by " + filterStmt + ";");
job = pigServer.store("E", OUTPUT4);
+ //Util.copyFromClusterToLocal(cluster, OUTPUT4 + "/part-m-00000", OUTPUT4);
stats = (JobStats) job.getStatistics().getJobGraph().getSources().get(0);
- assertEquals(20, stats.getRecordWrittern());
+ assertEquals(expectedRows, stats.getRecordWrittern());
long bytesWithPushdown = stats.getHdfsBytesRead();
- assertTrue((bytesWithoutPushdown - bytesWithPushdown) > 300000);
- //Verify that results are same
- Util.checkQueryOutputs(pigServer.openIterator("C"), pigServer.openIterator("E"));
- }
+ System.out.println("bytesWithoutPushdown was " + bytesWithoutPushdown +
+ " and bytesWithPushdown was " + bytesWithPushdown);
+ assertTrue("BytesWithoutPushdown was " + bytesWithoutPushdown +
+ " and bytesWithPushdown was " + bytesWithPushdown,
+ (bytesWithoutPushdown - bytesWithPushdown) > expectedBytesReadDiff);
+ // Verify that results are same
+ Util.checkQueryOutputs(pigServer_disabledRule.openIterator("C"), pigServer.openIterator("E"), expectedRows);
+ pigServer_disabledRule.shutdown();
- private Expression getExpressionForTest(String query, List<String> predicateCols) throws Exception {
- LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
- Operator op = newLogicalPlan.getSinks().get(0);
- LOFilter filter = (LOFilter) newLogicalPlan.getPredecessors(op).get(0);
- PredicatePushDownFilterExtractor filterExtractor = new PredicatePushDownFilterExtractor(filter.getFilterPlan(), predicateCols, supportedOpTypes);
- filterExtractor.visit();
- return filterExtractor.getPushDownExpression();
}
+
+
}
Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1616066&r1=1616065&r2=1616066&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Wed Aug 6 01:04:46 2014
@@ -511,15 +511,19 @@ public class Util {
* @param expectedResults Expected results List to validate against
*/
static public void checkQueryOutputs(Iterator<Tuple> actualResults,
- Iterator<Tuple> expectedResults) {
+ Iterator<Tuple> expectedResults, Integer expectedRows) {
+ int count = 0;
while (expectedResults.hasNext()) {
Tuple expected = expectedResults.next();
Assert.assertTrue("Actual result has less records than expected results", actualResults.hasNext());
Tuple actual = actualResults.next();
- System.out.println("Rohini: expected : " + expected.toString());
Assert.assertEquals(expected.toString(), actual.toString());
+ count++;
}
Assert.assertFalse("Actual result has more records than expected results", actualResults.hasNext());
+ if (expectedRows != null) {
+ Assert.assertEquals((int)expectedRows, count);
+ }
}
/**