You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/04/08 21:29:19 UTC
svn commit: r1090408 - in /pig/trunk:
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
test/org/apache/pig/test/TestAccumulator.java
Author: thejas
Date: Fri Apr 8 19:29:19 2011
New Revision: 1090408
URL: http://svn.apache.org/viewvc?rev=1090408&view=rev
Log:
PIG-1963: in nested foreach, accumutive udf taking input from order-by does not get results in order (thejas)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
pig/trunk/test/org/apache/pig/test/TestAccumulator.java
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java?rev=1090408&r1=1090407&r2=1090408&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java Fri Apr 8 19:29:19 2011
@@ -236,7 +236,10 @@ public class AccumulatorOptimizer extend
}
if (po instanceof POProject) {
- return true;
+ if(po.getInputs() == null )
+ return true;
+ else
+ return checkUDFInput(po.getInputs().get(0));
}
if (po instanceof ConstantExpression) {
Modified: pig/trunk/test/org/apache/pig/test/TestAccumulator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAccumulator.java?rev=1090408&r1=1090407&r2=1090408&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestAccumulator.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestAccumulator.java Fri Apr 8 19:29:19 2011
@@ -540,6 +540,52 @@ public class TestAccumulator extends Tes
}
}
+ /**
+ * see PIG-1963.
+ * If there is a POSort or PODistinct still remaining in the plan
+ * (after secondary sort optimization), accumulative mode can't
+ * be used as they are blocking operators
+ * @throws IOException
+ */
+ @Test
+ public void testAccumulatorOffOnSort() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ //one POSort will remain because secondary sort can be used only for one of them
+ pigServer.registerQuery("C = foreach B " +
+ "{ " +
+ " o1 = order A by fruit;" +
+ " o2 = order A by fruit desc;" +
+ " generate org.apache.pig.test.utils.AccumulativeSumBag(o1.fruit), " +
+ " org.apache.pig.test.utils.AccumulativeSumBag(o2.fruit); " +
+ "};");
+
+ checkAccumulatorOff("C");
+ }
+
+ /**
+ * see PIG-1963.
+ * If there is a POSort or PODistinct still remaining in the plan
+ * (after secondary sort optimization), accumulative mode can't
+ * be used as they are blocking operators
+ * @throws IOException
+ */
+ @Test
+ public void testAccumulatorOffOnDistinct() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit, category);");
+ pigServer.registerQuery("B = group A by id;");
+
+ pigServer.registerQuery("C = foreach B " +
+ "{ " +
+ " o1 = order A by fruit;" +
+ " d2 = distinct A.category;" +
+ " generate org.apache.pig.test.utils.AccumulativeSumBag(o1.fruit), " +
+ " org.apache.pig.test.utils.AccumulativeSumBag(d2); " +
+ "};");
+
+ checkAccumulatorOff("C");
+ }
+
@Test
public void testAccumulatorOff() throws IOException{
pigServer.getPigContext().getProperties().setProperty("opt.accumulator", "false");
@@ -548,8 +594,14 @@ public class TestAccumulator extends Tes
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulativeSumBag(A);");
+ checkAccumulatorOff("C");
+ pigServer.getPigContext().getProperties().setProperty("opt.accumulator", "true");
+
+ }
+
+ private void checkAccumulatorOff(String alias) {
try {
- Iterator<Tuple> iter = pigServer.openIterator("C");
+ Iterator<Tuple> iter = pigServer.openIterator(alias);
int c = 0;
while(iter.hasNext()) {
iter.next();
@@ -559,8 +611,7 @@ public class TestAccumulator extends Tes
}catch(Exception e) {
// we should get exception
}
-
- }
+ }
@Test
public void testAccumWithMap() throws IOException{
@@ -644,39 +695,5 @@ public class TestAccumulator extends Tes
Util.checkQueryOutputsAfterSort(iter, expectedRes);
}
- /**
- * see PIG-1911 .
- * accumulator udf reading from a nested relational op. generate projects
- * only the accumulator udf. using co-group
- * @throws IOException
- * @throws ParseException
- */
- @Test
- public void testAccumAfterNestedOpCoGroup() throws IOException, ParseException{
- // test group by
- pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
- pigServer.registerQuery("B = load '" + INPUT_FILE + "' as (id:int, fruit);");
- pigServer.registerQuery("C = cogroup A by id, B by id;");
- pigServer.registerQuery("D = foreach C " +
- "{ OA = order A by fruit;" +
- " FB = filter B by fruit != 'strawberry'; " +
- " generate" +
- " org.apache.pig.test.utils.AccumulatorBagCount(OA)," +
- " org.apache.pig.test.utils.AccumulativeSumBag(FB.fruit);" +
- "}; ");
-
- Iterator<Tuple> iter = pigServer.openIterator("D");
-
- List<Tuple> expectedRes =
- Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "(2,'(apple)(apple)')",
- "(1,'(orange)')",
- "(3,'(pear)(pear)')",
- "(1,'(apple)')"
- });
- Util.checkQueryOutputsAfterSort(iter, expectedRes);
- }
-
}