You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/01/11 20:42:09 UTC
svn commit: r1057827 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/
test/org/apache/pig/test/
Author: daijy
Date: Tue Jan 11 19:42:08 2011
New Revision: 1057827
URL: http://svn.apache.org/viewvc?rev=1057827&view=rev
Log:
PIG-1787: Error in logical plan generated
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1057827&r1=1057826&r2=1057827&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Jan 11 19:42:08 2011
@@ -257,6 +257,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1787: Error in logical plan generated (daijy)
+
PIG-1791: System property mapred.output.compress, but pig-cluster-hadoop-site.xml doesn't (daijy)
PIG-1771: New logical plan: Merge schema fail if LoadFunc.getSchema return different schema with "Load...AS" (daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1057827&r1=1057826&r2=1057827&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Tue Jan 11 19:42:08 2011
@@ -2994,7 +2994,11 @@ public class MRCompiler extends PhyPlanV
.getSuccessors(op);
if (succs==null) break;
op = succs.get(0);
- if (op instanceof POForEach) break;
+ if (op instanceof POLimit) {
+ // find operator after POLimit
+ op = sortMROp.reducePlan.getSuccessors(op).get(0);
+ break;
+ }
}
while (true) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=1057827&r1=1057826&r2=1057827&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java Tue Jan 11 19:42:08 2011
@@ -105,7 +105,7 @@ public class POPackageAnnotator extends
List<MapReduceOper> preds = this.mPlan.getPredecessors(mr);
for (Iterator<MapReduceOper> it = preds.iterator(); it.hasNext();) {
MapReduceOper mrOper = it.next();
- if (mrOper.isLimitOnly())
+ if (mrOper.isLimitOnly() && !mPlan.getPredecessors(mrOper).get(0).isGlobalSort())
mrOper = this.mPlan.getPredecessors(mrOper).get(0);
lrFound += patchPackage(mrOper.reducePlan, pkg);
if(lrFound == pkg.getNumInps()) {
Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1057827&r1=1057826&r2=1057827&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Tue Jan 11 19:42:08 2011
@@ -1130,4 +1130,30 @@ public class TestEvalPipeline2 extends T
assertTrue(results.contains(t.toString()));
assertFalse(iter.hasNext());
}
+
+ // See PIG-1787
+ @Test
+ public void testOrderByLimitJoin() throws Exception{
+ String[] input1 = {
+ "1\t1",
+ "1\t2"
+ };
+
+ Util.createInputFile(cluster, "table_testOrderByLimitJoin", input1);
+
+ pigServer.registerQuery("a = load 'table_testOrderByLimitJoin' as (a0, a1);");
+ pigServer.registerQuery("b = group a by a0;");
+ pigServer.registerQuery("c = foreach b generate group as c0, COUNT(a) as c1;");
+ pigServer.registerQuery("d = order c by c1 parallel 2;");
+ pigServer.registerQuery("e = limit d 10;");
+ pigServer.registerQuery("f = join e by c0, a by a0;");
+
+ Iterator<Tuple> iter = pigServer.openIterator("f");
+
+ Tuple t = iter.next();
+ assertTrue(t.toString().equals("(1,2,1,1)"));
+ t = iter.next();
+ assertTrue(t.toString().equals("(1,2,1,2)"));
+ assertFalse(iter.hasNext());
+ }
}