You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/08/04 23:35:47 UTC
svn commit: r1615805 [2/2] - in /pig/trunk: ./ ivy/
shims/test/hadoop20/org/apache/pig/test/
shims/test/hadoop23/org/apache/pig/test/ src/org/apache/pig/
src/org/apache/pig/builtin/ src/org/apache/pig/newplan/
src/org/apache/pig/newplan/logical/optimiz...
Added: pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java?rev=1615805&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java (added)
+++ pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java Mon Aug 4 21:35:47 2014
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.SimpleLayout;
+import org.apache.pig.ExecType;
+import org.apache.pig.Expression;
+import org.apache.pig.Expression.OpType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PredicatePushDownFilterExtractor;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.rules.ColumnPruneVisitor;
+import org.apache.pig.test.MiniGenericCluster;
+import org.apache.pig.test.Util;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestOrcStoragePushdown {
+
+ private static List<OpType> supportedOpTypes;
+ private static MiniGenericCluster cluster;
+ private PigServer pigServer;
+ private String query = "a = load 'foo' as (srcid:int, mrkt:chararray, dstid:int, name:chararray, " +
+ "age:int, browser:map[], location:tuple(country:chararray, zip:int));";
+ private OrcStorage orcStorage;
+
+ private static final String basedir = "test/org/apache/pig/builtin/orc/";
+ private static final String outbasedir = System.getProperty("user.dir") + "/build/test/TestOrcStorage/";
+ private static String OUTPUT1 = outbasedir + "TestOrcStorage_1";
+ private static String OUTPUT2 = outbasedir + "TestOrcStorage_2";
+ private static String OUTPUT3 = outbasedir + "TestOrcStorage_3";
+ private static String OUTPUT4 = outbasedir + "TestOrcStorage_4";
+
+ private static File logFile;
+
+ @BeforeClass
+ public static void oneTimeSetup() throws IOException{
+ cluster = MiniGenericCluster.buildCluster();
+ Util.copyFromLocalToCluster(cluster, basedir + "orc-file-11-format.orc", basedir + "orc-file-11-format.orc");
+
+ if(Util.WINDOWS){
+ OUTPUT1 = OUTPUT1.replace("\\", "/");
+ OUTPUT2 = OUTPUT2.replace("\\", "/");
+ OUTPUT3 = OUTPUT3.replace("\\", "/");
+ OUTPUT4 = OUTPUT4.replace("\\", "/");
+ }
+
+ supportedOpTypes = new ArrayList<OpType>();
+ supportedOpTypes.add(OpType.OP_EQ);
+ supportedOpTypes.add(OpType.OP_NE);
+ supportedOpTypes.add(OpType.OP_GT);
+ supportedOpTypes.add(OpType.OP_GE);
+ supportedOpTypes.add(OpType.OP_LT);
+ supportedOpTypes.add(OpType.OP_LE);
+ supportedOpTypes.add(OpType.OP_IN);
+ supportedOpTypes.add(OpType.OP_BETWEEN);
+ supportedOpTypes.add(OpType.OP_NULL);
+ supportedOpTypes.add(OpType.OP_NOT);
+ supportedOpTypes.add(OpType.OP_AND);
+ supportedOpTypes.add(OpType.OP_OR);
+
+ Logger logger = Logger.getLogger(ColumnPruneVisitor.class);
+ logger.removeAllAppenders();
+ logger.setLevel(Level.INFO);
+ SimpleLayout layout = new SimpleLayout();
+ logFile = File.createTempFile("log", "");
+ FileAppender appender = new FileAppender(layout, logFile.toString(), false, false, 0);
+ logger.addAppender(appender);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ if (cluster != null) {
+ cluster.shutDown();
+ }
+ }
+
+ @Before
+ public void setup() throws ExecException{
+ Util.resetStateForExecModeSwitch();
+ pigServer = new PigServer(ExecType.LOCAL);
+ orcStorage = new OrcStorage();
+ }
+
+ @After
+ public void teardown() throws IOException {
+ if(pigServer != null) {
+ pigServer.shutdown();
+ }
+ Util.deleteDirectory(new File(outbasedir));
+ if (cluster != null) {
+ Util.deleteFile(cluster, outbasedir);
+ }
+ }
+
+ @Test
+ public void testSimple() throws Exception {
+ String q = query + "b = filter a by srcid == 10;" + "store b into 'out';";
+ Expression expr = getExpressionForTest(q, Arrays.asList("srcid"));
+ SearchArgument sarg = orcStorage.getSearchArgument(expr);
+ assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
+ "expr = leaf-0", sarg.toString());
+ }
+
+ @Test
+ public void testAndOr() throws Exception {
+ String q = query + "b = filter a by (srcid > 10 or dstid <= 5) and name == 'foo' and mrkt is null;" + "store b into 'out';";
+ Expression expr = getExpressionForTest(q, Arrays.asList("srcid", "dstid", "name", "mrkt"));
+ SearchArgument sarg = orcStorage.getSearchArgument(expr);
+ assertEquals("leaf-0 = (LESS_THAN_EQUALS srcid 10)\n" +
+ "leaf-1 = (LESS_THAN_EQUALS dstid 5)\n" +
+ "leaf-2 = (EQUALS name foo)\n" +
+ "leaf-3 = (IS_NULL mrkt)\n" +
+ "expr = (and (or (not leaf-0) leaf-1) leaf-2 leaf-3)", sarg.toString());
+ }
+
+ @Test
+ public void testNot() throws Exception {
+ String q = query + "b = filter a by srcid != 10 and mrkt is not null;" + "store b into 'out';";
+ Expression expr = getExpressionForTest(q, Arrays.asList("srcid", "dstid", "name", "mrkt"));
+ SearchArgument sarg = orcStorage.getSearchArgument(expr);
+ assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
+ "leaf-1 = (IS_NULL mrkt)\n" +
+ "expr = (and (not leaf-0) (not leaf-1))", sarg.toString());
+ }
+
+ @Test
+ public void testBetweenExpression() throws Exception {
+ // TODO: Add support for OP_BETWEEN expression type
+ String q = query + "b = filter a by srcid > 10 or srcid < 20;" + "store b into 'out';";
+ Expression expr = getExpressionForTest(q, Arrays.asList("srcid"));
+ SearchArgument sarg = orcStorage.getSearchArgument(expr);
+ assertEquals("leaf-0 = (LESS_THAN_EQUALS srcid 10)\n" +
+ "leaf-1 = (LESS_THAN srcid 20)\n" +
+ "expr = (or (not leaf-0) leaf-1)", sarg.toString());
+ }
+
+ @Test
+ public void testInExpression() throws Exception {
+ // TODO: Add support for OP_IN expression type
+ String q = query + "b = filter a by srcid == 10 or srcid == 11;" + "store b into 'out';";
+ Expression expr = getExpressionForTest(q, Arrays.asList("srcid"));
+ SearchArgument sarg = orcStorage.getSearchArgument(expr);
+ System.out.println(sarg);
+ assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
+ "leaf-1 = (EQUALS srcid 11)\n" +
+ "expr = (or leaf-0 leaf-1)", sarg.toString());
+ }
+
+ @Test
+ public void testNegativeMatchesExpr() throws Exception {
+ // matches operator is not a supported op type
+ String q = query + "b = filter a by name matches 'foo*';" + "store b into 'out';";
+ Expression expr = getExpressionForTest(q, Arrays.asList("name"));
+ Assert.assertNull(expr);
+ SearchArgument sarg = orcStorage.getSearchArgument(expr);
+ Assert.assertNull(sarg);
+
+ // AND in LHS/RHS
+ q = query + "b = filter a by name matches 'foo*' and srcid == 10;" + "store b into 'out';";
+ expr = getExpressionForTest(q, Arrays.asList("srcid", "name"));
+ sarg = orcStorage.getSearchArgument(expr);
+ assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
+ "expr = leaf-0", sarg.toString());
+
+ q = query + "b = filter a by srcid == 10 and name matches 'foo*';" + "store b into 'out';";
+ expr = getExpressionForTest(q, Arrays.asList("srcid", "name"));
+ sarg = orcStorage.getSearchArgument(expr);
+ assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
+ "expr = leaf-0", sarg.toString());
+
+ // OR - Nothing should be pushed
+ q = query + "b = filter a by name matches 'foo*' or srcid == 10;" + "store b into 'out';";
+ expr = getExpressionForTest(q, Arrays.asList("srcid", "name"));
+ Assert.assertNull(expr);
+ sarg = orcStorage.getSearchArgument(expr);
+ Assert.assertNull(sarg);
+ }
+
+ @Test
+ public void testUnSupportedFields() throws Exception {
+ //Struct, Map and Bag are not supported
+ // TODO: Change the test to use ORCStorage to test OrcStorage.getPredicateFields()
+ String q = query + "b = filter a by srcid == 10 and browser#'type' == 'IE';" +
+ "store b into 'out';";
+ Expression expr = getExpressionForTest(q, Arrays.asList("srcid"));
+ SearchArgument sarg = orcStorage.getSearchArgument(expr);
+ assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
+ "expr = leaf-0", sarg.toString());
+ }
+
+ // Minicluster tests which verify stats
+
+ @Test
+ public void testColumnPruneBytesRead() throws Exception {
+ Util.resetStateForExecModeSwitch();
+ pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+
+ pigServer.registerQuery("A = load '" + basedir + "orc-file-11-format.orc' using OrcStorage();");
+ ExecJob job = pigServer.store("A", OUTPUT1);
+ JobStats stats = (JobStats) job.getStatistics().getJobGraph().getSources().get(0);
+ long bytesWithoutPushdown = stats.getHdfsBytesRead();
+
+ pigServer.registerQuery("PRUNE = load '" + basedir + "orc-file-11-format.orc' using OrcStorage();");
+ pigServer.registerQuery("PRUNE = foreach PRUNE generate boolean1;");
+ job = pigServer.store("PRUNE", OUTPUT2);
+ Util.checkLogFileMessage(logFile, new String[]{"Columns pruned for PRUNE: $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13"}, true);
+ stats = (JobStats) job.getStatistics().getJobGraph().getSources().get(0);
+ long bytesWithPushdown = stats.getHdfsBytesRead();
+
+ assertTrue((bytesWithoutPushdown - bytesWithPushdown) > 300000);
+ }
+
+ @Test
+ public void testPredicatePushdownBytesRead() throws Exception {
+ new File(outbasedir).mkdirs();
+ BufferedWriter bw = new BufferedWriter(new FileWriter(OUTPUT1));
+ long[] f2 = new long[] {100L, 200L, 300L};
+ for (int i=1; i <= 10000; i++) {
+ bw.write(i + "\t" + f2[i%3] + "\t" + (i%2 == 0 ? "" : RandomStringUtils.random(100))+ "\n");
+ }
+ bw.close();
+
+ // Store only 1000 rows in each row block (MIN_ROW_INDEX_STRIDE is 1000. So can't use less than that)
+ pigServer.registerQuery("A = load '" + OUTPUT1 + "' as (f1:int, f2:long, f3:chararray);");
+ pigServer.registerQuery("store A into '" + OUTPUT2 +"' using OrcStorage('-r 1000');");
+ Util.copyFromLocalToCluster(cluster, OUTPUT2, OUTPUT2);
+
+ Util.resetStateForExecModeSwitch();
+ pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+
+ // Test with PredicatePushdownOptimizer disabled. All 3 blocks will be read
+ HashSet<String> disabledOptimizerRules = new HashSet<String>();
+ disabledOptimizerRules.add("PredicatePushdownOptimizer");
+ pigServer.getPigContext().getProperties().setProperty(PigImplConstants.PIG_OPTIMIZER_RULES_KEY,
+ ObjectSerializer.serialize(disabledOptimizerRules));
+ pigServer.registerQuery("B = load '" + OUTPUT2 + "' using OrcStorage();");
+ pigServer.registerQuery("C = filter B by f1 > 980 and f1 < 1010 and (f2 == 100 or f3 is not null);");
+ ExecJob job = pigServer.store("C", OUTPUT3);
+ JobStats stats = (JobStats) job.getStatistics().getJobGraph().getSources().get(0);
+ assertEquals(20, stats.getRecordWrittern());
+ long bytesWithoutPushdown = stats.getHdfsBytesRead();
+
+ // Test with PredicatePushdownOptimizer enabled. Only 2 blocks should be read
+ pigServer.getPigContext().getProperties().remove(PigImplConstants.PIG_OPTIMIZER_RULES_KEY);
+ pigServer.registerQuery("D = load '" + OUTPUT2 + "' using OrcStorage();");
+ pigServer.registerQuery("E = filter D by f1 > 980 and f1 < 1010 and (f2 == 100 or f3 is not null);");
+ job = pigServer.store("E", OUTPUT4);
+ stats = (JobStats) job.getStatistics().getJobGraph().getSources().get(0);
+ assertEquals(20, stats.getRecordWrittern());
+ long bytesWithPushdown = stats.getHdfsBytesRead();
+
+ assertTrue((bytesWithoutPushdown - bytesWithPushdown) > 300000);
+ //Verify that results are same
+ Util.checkQueryOutputs(pigServer.openIterator("C"), pigServer.openIterator("E"));
+ }
+
+ private Expression getExpressionForTest(String query, List<String> predicateCols) throws Exception {
+ LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
+ Operator op = newLogicalPlan.getSinks().get(0);
+ LOFilter filter = (LOFilter) newLogicalPlan.getPredecessors(op).get(0);
+ PredicatePushDownFilterExtractor filterExtractor = new PredicatePushDownFilterExtractor(filter.getFilterPlan(), predicateCols, supportedOpTypes);
+ filterExtractor.visit();
+ return filterExtractor.getPushDownExpression();
+ }
+
+}
Modified: pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java (original)
+++ pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java Mon Aug 4 21:35:47 2014
@@ -85,7 +85,7 @@ abstract public class MiniGenericCluster
return INSTANCE;
}
- abstract protected ExecType getExecType();
+ abstract public ExecType getExecType();
abstract protected void setupMiniDfsAndMrClusters();
Modified: pig/trunk/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java Mon Aug 4 21:35:47 2014
@@ -45,7 +45,7 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.Utils;
-import org.apache.pig.newplan.FilterExtractor;
+import org.apache.pig.newplan.PartitionFilterExtractor;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.expression.AndExpression;
@@ -454,9 +454,8 @@ public class TestNewPartitionFilterPushD
Operator op = newLogicalPlan.getSinks().get(0);
LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
- String actual =
- FilterExtractor.getExpression((LogicalExpression) filter.getFilterPlan().
- getSources().get(0)).toString();
+ String actual = new PartitionFilterExtractor(null, new ArrayList<String>())
+ .getExpression((LogicalExpression) filter.getFilterPlan().getSources().get(0)).toString();
Assert.assertEquals("checking trimmed filter expression:",
filterExpr, actual);
} else {
@@ -687,30 +686,30 @@ public class TestNewPartitionFilterPushD
}
//// helper methods ///////
- private FilterExtractor test(String query, List<String> partitionCols,
+ private PartitionFilterExtractor test(String query, List<String> partitionCols,
String expPartFilterString, String expFilterString)
throws Exception {
return test(query, partitionCols, expPartFilterString, expFilterString, false);
}
- private FilterExtractor test(String query, List<String> partitionCols,
+ private PartitionFilterExtractor test(String query, List<String> partitionCols,
String expPartFilterString, String expFilterString, boolean unsupportedExpression)
throws Exception {
PigServer pigServer = new PigServer( pc );
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
Operator op = newLogicalPlan.getSinks().get(0);
LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
- FilterExtractor pColExtractor = new FilterExtractor(
+ PartitionFilterExtractor pColExtractor = new PartitionFilterExtractor(
filter.getFilterPlan(), partitionCols);
pColExtractor.visit();
if(expPartFilterString == null) {
Assert.assertEquals("Checking partition column filter:", null,
- pColExtractor.getPColCondition());
+ pColExtractor.getPushDownExpression());
} else {
Assert.assertEquals("Checking partition column filter:",
expPartFilterString,
- pColExtractor.getPColCondition().toString());
+ pColExtractor.getPushDownExpression().toString());
}
if (expFilterString == null) {
@@ -721,7 +720,7 @@ public class TestNewPartitionFilterPushD
String actual = getTestExpression((LogicalExpression)pColExtractor.getFilteredPlan().getSources().get(0)).toString();
Assert.assertEquals("checking trimmed filter expression:", expFilterString, actual);
} else {
- String actual = FilterExtractor.getExpression((LogicalExpression)pColExtractor.getFilteredPlan().getSources().get(0)).toString();
+ String actual = pColExtractor.getExpression((LogicalExpression)pColExtractor.getFilteredPlan().getSources().get(0)).toString();
Assert.assertEquals("checking trimmed filter expression:", expFilterString, actual);
}
}
@@ -735,7 +734,7 @@ public class TestNewPartitionFilterPushD
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
Operator op = newLogicalPlan.getSinks().get(0);
LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
- FilterExtractor extractor = new FilterExtractor(
+ PartitionFilterExtractor extractor = new PartitionFilterExtractor(
filter.getFilterPlan(), partitionCols);
extractor.visit();
Assert.assertFalse(extractor.canPushDown());
@@ -808,6 +807,7 @@ public class TestNewPartitionFilterPushD
super( p, iterations, new HashSet<String>() );
}
+ @Override
protected List<Set<Rule>> buildRuleSets() {
List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Mon Aug 4 21:35:47 2014
@@ -84,7 +84,6 @@ import org.apache.pig.impl.io.FileLocali
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanPrinter;
import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
@@ -93,15 +92,9 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
-import org.apache.pig.newplan.logical.visitor.CastLineageSetter;
-import org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor;
import org.apache.pig.newplan.logical.visitor.DanglingNestedNodeRemover;
-import org.apache.pig.newplan.logical.visitor.ScalarVisitor;
-import org.apache.pig.newplan.logical.visitor.SchemaAliasVisitor;
import org.apache.pig.newplan.logical.visitor.SortInfoSetter;
import org.apache.pig.newplan.logical.visitor.StoreAliasSetter;
-import org.apache.pig.newplan.logical.visitor.TypeCheckingRelVisitor;
-import org.apache.pig.newplan.logical.visitor.UnionOnSchemaSetter;
import org.apache.pig.parser.ParserException;
import org.apache.pig.parser.QueryParserDriver;
import org.apache.pig.tools.grunt.GruntParser;
@@ -510,6 +503,25 @@ public class Util {
Assert.assertEquals(expectedResults.size(), count);
}
+ /**
+ * Helper function to check if the result of a Pig Query is in line with
+ * expected results.
+ *
+ * @param actualResults Result of the executed Pig query
+ * @param expectedResults Expected results List to validate against
+ */
+ static public void checkQueryOutputs(Iterator<Tuple> actualResults,
+ Iterator<Tuple> expectedResults) {
+ while (expectedResults.hasNext()) {
+ Tuple expected = expectedResults.next();
+ Assert.assertTrue("Actual result has less records than expected results", actualResults.hasNext());
+ Tuple actual = actualResults.next();
+ System.out.println("Rohini: expected : " + expected.toString());
+ Assert.assertEquals(expected.toString(), actual.toString());
+ }
+ Assert.assertFalse("Actual result has more records than expected results", actualResults.hasNext());
+ }
+
/**
* Helper function to check if the result of a Pig Query is in line with
* expected results. It sorts actual and expected results before comparison
@@ -1071,7 +1083,7 @@ public class Util {
throws Exception {
LogicalPlan lp = buildLp( pigServer, query );
lp.optimize(pigServer.getPigContext());
- return ((HExecutionEngine)pigServer.getPigContext().getExecutionEngine()).compile(lp,
+ return ((HExecutionEngine)pigServer.getPigContext().getExecutionEngine()).compile(lp,
pigServer.getPigContext().getProperties());
}
@@ -1314,6 +1326,7 @@ public class Util {
final String suffix = System.getProperty("hadoopversion").equals("20") ? "1" : "2";
File baseDir = new File(".");
String[] jarNames = baseDir.list(new FilenameFilter() {
+ @Override
public boolean accept(File dir, String name) {
if (!name.matches("pig.*h" + suffix + "\\.jar")) {
return false;