You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/04/08 21:01:53 UTC
svn commit: r1090401 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
test/org/apache/pig/test/
Author: thejas
Date: Fri Apr 8 19:01:53 2011
New Revision: 1090401
URL: http://svn.apache.org/viewvc?rev=1090401&view=rev
Log:
PIG-1911: Infinite loop with accumulator function in nested foreach
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
pig/trunk/test/org/apache/pig/test/TestAccumulator.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1090401&r1=1090400&r2=1090401&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Apr 8 19:01:53 2011
@@ -140,6 +140,8 @@ PIG-1696: Performance: Use System.arrayc
BUG FIXES
+PIG-1911: Infinite loop with accumulator function in nested foreach (thejas)
+
PIG-1923: Jython UDFs fail to convert Maps of Integer values back to Pig types (julien)
PIG-1944: register javascript UDFs does not work (julien)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1090401&r1=1090400&r2=1090401&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Fri Apr 8 19:01:53 2011
@@ -65,6 +65,7 @@ public class POUserFunc extends Expressi
private MonitoredUDFExecutor executor = null;
private PhysicalOperator referencedOperator = null;
+ private boolean isAccumulationDone;
public PhysicalOperator getReferencedOperator() {
return referencedOperator;
@@ -189,9 +190,24 @@ public class POUserFunc extends Expressi
((Accumulator)func).accumulate((Tuple)result.result);
result.returnStatus = POStatus.STATUS_BATCH_OK;
result.result = null;
+ isAccumulationDone = false;
}else{
- result.result = ((Accumulator)func).getValue();
- ((Accumulator)func).cleanup();
+ if(isAccumulationDone){
+ //PORelationToExprProject does not return STATUS_EOP
+ // so that udf gets called both when isAccumStarted
+ // is first true and then set to false, even
+ //when the input relation is empty.
+ // so the STATUS_EOP has to be sent from POUserFunc,
+ // after the results have been sent.
+ result.result = null;
+ result.returnStatus = POStatus.STATUS_EOP;
+ }
+ else{
+ result.result = ((Accumulator)func).getValue();
+ result.returnStatus = POStatus.STATUS_OK;
+ ((Accumulator)func).cleanup();
+ isAccumulationDone = true;
+ }
}
} else {
if (executor != null) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1090401&r1=1090400&r2=1090401&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Fri Apr 8 19:01:53 2011
@@ -256,6 +256,7 @@ public class POForEach extends PhysicalO
}
}
+ setAccumStart();
while(true) {
if (buffer.hasNextBatch()) {
try {
@@ -263,7 +264,6 @@ public class POForEach extends PhysicalO
}catch(IOException e) {
throw new ExecException(e);
}
- setAccumStart();
}else{
inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0);
// buffer.clear();
Modified: pig/trunk/test/org/apache/pig/test/TestAccumulator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAccumulator.java?rev=1090401&r1=1090400&r2=1090401&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestAccumulator.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestAccumulator.java Fri Apr 8 19:01:53 2011
@@ -20,12 +20,15 @@ package org.apache.pig.test;
import java.io.*;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
+
import junit.framework.TestCase;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -613,5 +616,67 @@ public class TestAccumulator extends Tes
}
}
+ /**
+ * see PIG-1911 .
+ * accumulator udf reading from a nested relational op. generate projects
+ * only the accumulator udf.
+ * @throws IOException
+ * @throws ParseException
+ */
+ @Test
+ public void testAccumAfterNestedOp() throws IOException, ParseException{
+ // test group by
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B " +
+ "{ o = order A by id; " +
+ " generate org.apache.pig.test.utils.AccumulatorBagCount(o);}; ");
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(2)",
+ "(1)",
+ "(3)",
+ "(1)"
+ });
+ Util.checkQueryOutputsAfterSort(iter, expectedRes);
+ }
+
+ /**
+ * see PIG-1911 .
+ * accumulator udf reading from a nested relational op. generate projects
+ * only the accumulator udf. using co-group
+ * @throws IOException
+ * @throws ParseException
+ */
+ @Test
+ public void testAccumAfterNestedOpCoGroup() throws IOException, ParseException{
+ // test group by
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("C = cogroup A by id, B by id;");
+ pigServer.registerQuery("D = foreach C " +
+ "{ OA = order A by fruit;" +
+ " FB = filter B by fruit != 'strawberry'; " +
+ " generate" +
+ " org.apache.pig.test.utils.AccumulatorBagCount(OA)," +
+ " org.apache.pig.test.utils.AccumulativeSumBag(FB.fruit);" +
+ "}; ");
+
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(2,'(apple)(apple)')",
+ "(1,'(orange)')",
+ "(3,'(pear)(pear)')",
+ "(1,'(apple)')"
+ });
+ Util.checkQueryOutputsAfterSort(iter, expectedRes);
+ }
+
}