You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/04/08 21:01:53 UTC

svn commit: r1090401 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ test/org/apache/pig/test/

Author: thejas
Date: Fri Apr  8 19:01:53 2011
New Revision: 1090401

URL: http://svn.apache.org/viewvc?rev=1090401&view=rev
Log:
PIG-1911: Infinite loop with accumulator function in nested foreach

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    pig/trunk/test/org/apache/pig/test/TestAccumulator.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1090401&r1=1090400&r2=1090401&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Apr  8 19:01:53 2011
@@ -140,6 +140,8 @@ PIG-1696: Performance: Use System.arrayc
 
 BUG FIXES
 
+PIG-1911: Infinite loop with accumulator function in nested foreach (thejas)
+
 PIG-1923: Jython UDFs fail to convert Maps of Integer values back to Pig types (julien)
 
 PIG-1944: register javascript UDFs does not work (julien)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1090401&r1=1090400&r2=1090401&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Fri Apr  8 19:01:53 2011
@@ -65,6 +65,7 @@ public class POUserFunc extends Expressi
     private MonitoredUDFExecutor executor = null;
 
     private PhysicalOperator referencedOperator = null;
+    private boolean isAccumulationDone;
 
     public PhysicalOperator getReferencedOperator() {
         return referencedOperator;
@@ -189,9 +190,24 @@ public class POUserFunc extends Expressi
                         ((Accumulator)func).accumulate((Tuple)result.result);
                         result.returnStatus = POStatus.STATUS_BATCH_OK;
                         result.result = null;
+                        isAccumulationDone = false;
                     }else{
-                        result.result = ((Accumulator)func).getValue();
-                        ((Accumulator)func).cleanup();
+                        if(isAccumulationDone){
+                            //PORelationToExprProject does not return STATUS_EOP
+                            // so that udf gets called both when isAccumStarted
+                            // is first true and then set to false, even
+                            //when the input relation is empty.
+                            // so the STATUS_EOP has to be sent from POUserFunc, 
+                            // after the results have been sent.
+                            result.result = null;
+                            result.returnStatus = POStatus.STATUS_EOP;
+                        }
+                        else{
+                            result.result = ((Accumulator)func).getValue();
+                            result.returnStatus = POStatus.STATUS_OK;
+                            ((Accumulator)func).cleanup();
+                            isAccumulationDone = true;
+                        }
                     }
                 } else {
                     if (executor != null) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1090401&r1=1090400&r2=1090401&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Fri Apr  8 19:01:53 2011
@@ -256,6 +256,7 @@ public class POForEach extends PhysicalO
                     }
                 }
 
+                setAccumStart();
                 while(true) {
                     if (buffer.hasNextBatch()) {
                         try {
@@ -263,7 +264,6 @@ public class POForEach extends PhysicalO
                         }catch(IOException e) {
                             throw new ExecException(e);
                         }
-                        setAccumStart();
                     }else{
                         inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0);
  //                       buffer.clear();

Modified: pig/trunk/test/org/apache/pig/test/TestAccumulator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAccumulator.java?rev=1090401&r1=1090400&r2=1090401&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestAccumulator.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestAccumulator.java Fri Apr  8 19:01:53 2011
@@ -20,12 +20,15 @@ package org.apache.pig.test;
 import java.io.*;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
+
 import junit.framework.TestCase;
 
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -613,5 +616,67 @@ public class TestAccumulator extends Tes
         }      
     }
     
+    /**
+     * see PIG-1911 . 
+     * accumulator udf reading from a nested relational op. generate projects
+     * only the accumulator udf.
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testAccumAfterNestedOp() throws IOException, ParseException{
+        // test group by
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B " +
+                        "{ o = order A by id; " +
+                        "  generate org.apache.pig.test.utils.AccumulatorBagCount(o);}; ");                     
+                 
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "(2)",
+                            "(1)",
+                            "(3)",
+                            "(1)"
+                    });
+        Util.checkQueryOutputsAfterSort(iter, expectedRes);
+    } 
+
+    /**
+     * see PIG-1911 . 
+     * accumulator udf reading from a nested relational op. generate projects
+     * only the accumulator udf. using co-group
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testAccumAfterNestedOpCoGroup() throws IOException, ParseException{
+        // test group by
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("B = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("C = cogroup A by id, B by id;");
+        pigServer.registerQuery("D = foreach C " +
+                "{ OA = order A by fruit;" +
+                "  FB = filter B by fruit != 'strawberry'; " +
+                "  generate" +
+                "     org.apache.pig.test.utils.AccumulatorBagCount(OA)," +
+                "     org.apache.pig.test.utils.AccumulativeSumBag(FB.fruit);" +
+        "}; ");                     
+
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+ 
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "(2,'(apple)(apple)')",
+                            "(1,'(orange)')",
+                            "(3,'(pear)(pear)')",
+                            "(1,'(apple)')"
+                    });
+        Util.checkQueryOutputsAfterSort(iter, expectedRes);
+    }    
+    
 
 }