You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2010/01/05 20:37:58 UTC
svn commit: r896188 - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
test/org/apache/pig/test/
Author: olga
Date: Tue Jan 5 19:37:56 2010
New Revision: 896188
URL: http://svn.apache.org/viewvc?rev=896188&view=rev
Log:
PIG-1171: Top-N queries produce incorrect results when followed by a cross statement (rding via olgan)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=896188&r1=896187&r2=896188&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jan 5 19:37:56 2010
@@ -74,6 +74,8 @@
BUG FIXES
+PIG-1171: Top-N queries produce incorrect results when followed by a cross statement (rding via olgan)
+
PIG-1159: merge join right side table does not support comma seperated paths
(rding via olgan)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=896188&r1=896187&r2=896188&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Jan 5 19:37:56 2010
@@ -575,17 +575,27 @@
jobConf.set("pig.quantilesFile", mro.getQuantFile());
jobConf.setPartitionerClass(WeightedRangePartitioner.class);
}
- if(mro.UDFs.size()==1){
- String compFuncSpec = mro.UDFs.get(0);
- Class comparator = PigContext.resolveClassName(compFuncSpec);
- if(ComparisonFunc.class.isAssignableFrom(comparator)) {
- jobConf.setMapperClass(PigMapReduce.MapWithComparator.class);
- jobConf.setReducerClass(PigMapReduce.ReduceWithComparator.class);
- jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
- jobConf.set("pig.usercomparator", "true");
- jobConf.setOutputKeyClass(NullableTuple.class);
- jobConf.setOutputKeyComparatorClass(comparator);
+
+ if (mro.isUDFComparatorUsed) {
+ boolean usercomparator = false;
+ for (String compFuncSpec : mro.UDFs) {
+ Class comparator = PigContext.resolveClassName(compFuncSpec);
+ if(ComparisonFunc.class.isAssignableFrom(comparator)) {
+ jobConf.setMapperClass(PigMapReduce.MapWithComparator.class);
+ jobConf.setReducerClass(PigMapReduce.ReduceWithComparator.class);
+ jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
+ jobConf.set("pig.usercomparator", "true");
+ jobConf.setOutputKeyClass(NullableTuple.class);
+ jobConf.setOutputKeyComparatorClass(comparator);
+ usercomparator = true;
+ break;
+ }
}
+ if (!usercomparator) {
+ String msg = "Internal error. Can't find the UDF comparator";
+ throw new IOException (msg);
+ }
+
} else {
jobConf.set("pig.sortOrder",
ObjectSerializer.serialize(mro.getSortOrder()));
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=896188&r1=896187&r2=896188&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Tue Jan 5 19:37:56 2010
@@ -1568,6 +1568,7 @@
if(op.isUDFComparatorUsed){
curMROp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
+ curMROp.isUDFComparatorUsed = true;
}
phyToMROpMap.put(op, curMROp);
}catch(Exception e){
@@ -1895,6 +1896,7 @@
if(sort.isUDFComparatorUsed) {
mro.UDFs.add(sort.getMSortFunc().getFuncSpec().toString());
+ curMROp.isUDFComparatorUsed = true;
}
List<Boolean> flat1 = new ArrayList<Boolean>();
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=896188&r1=896187&r2=896188&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Tue Jan 5 19:37:56 2010
@@ -107,6 +107,9 @@
public List<String> UDFs;
+ // Indicates if a UDF comparator is used
+ boolean isUDFComparatorUsed = false;
+
transient NodeIdGenerator nig;
private String scope;
@@ -133,7 +136,7 @@
// Name of the partition file generated by sampling process,
// Used by Skewed Join
private String skewedJoinPartitionFile;
-
+
public MapReduceOper(OperatorKey k) {
super(k);
mapPlan = new PhysicalPlan();
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=896188&r1=896187&r2=896188&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Tue Jan 5 19:37:56 2010
@@ -87,6 +87,63 @@
public void tearDown() throws Exception {
myPig = null;
}
+
+ public void testMultiQueryJiraPig1171() {
+
+ // test case: Problems with some top N queries
+
+ String INPUT_FILE = "abc";
+
+ try {
+
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ w.println("1\tapple\t3");
+ w.println("2\torange\t4");
+ w.println("3\tpersimmon\t5");
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+ myPig.setBatchOn();
+
+ myPig.registerQuery("A = load '" + INPUT_FILE
+ + "' as (a:long, b, c);");
+ myPig.registerQuery("A1 = Order A by a desc;");
+ myPig.registerQuery("A2 = limit A1 1;");
+ myPig.registerQuery("B = load '" + INPUT_FILE
+ + "' as (a:long, b, c);");
+ myPig.registerQuery("B1 = Order B by a desc;");
+ myPig.registerQuery("B2 = limit B1 1;");
+
+ myPig.registerQuery("C = cross A2, B2;");
+
+ Iterator<Tuple> iter = myPig.openIterator("C");
+
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(3L,'persimmon',5,3L,'persimmon',5)"
+ });
+
+ int counter = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ }
+
+ assertEquals(expectedResults.size(), counter);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ new File(INPUT_FILE).delete();
+ try {
+ Util.deleteFile(cluster, INPUT_FILE);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
public void testMultiQueryJiraPig1157() {