You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/04/08 21:29:19 UTC

svn commit: r1090408 - in /pig/trunk: src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java test/org/apache/pig/test/TestAccumulator.java

Author: thejas
Date: Fri Apr  8 19:29:19 2011
New Revision: 1090408

URL: http://svn.apache.org/viewvc?rev=1090408&view=rev
Log:
PIG-1963: in nested foreach, accumutive udf taking input from order-by does not get results in order (thejas)

Modified:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
    pig/trunk/test/org/apache/pig/test/TestAccumulator.java

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java?rev=1090408&r1=1090407&r2=1090408&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java Fri Apr  8 19:29:19 2011
@@ -236,7 +236,10 @@ public class AccumulatorOptimizer extend
         }
 
         if (po instanceof POProject) {
-            return true;
+            if(po.getInputs() == null )
+                return true;
+            else 
+                return checkUDFInput(po.getInputs().get(0));
         }
         
         if (po instanceof ConstantExpression) {

Modified: pig/trunk/test/org/apache/pig/test/TestAccumulator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAccumulator.java?rev=1090408&r1=1090407&r2=1090408&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestAccumulator.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestAccumulator.java Fri Apr  8 19:29:19 2011
@@ -540,6 +540,52 @@ public class TestAccumulator extends Tes
         }      
     }
     
+    /**
+     * see PIG-1963.
+     * If there is a POSort or PODistinct still remaining in the plan 
+     * (after secondary sort optimization), accumulative mode can't 
+     * be used as they are blocking operators
+     * @throws IOException 
+     */
+    @Test
+    public void testAccumulatorOffOnSort() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        //one POSort will remain because secondary sort can be used only for one of them
+        pigServer.registerQuery("C = foreach B " +
+                        "{ " +
+                        "  o1 = order A by fruit;" +
+                        "  o2 = order A by fruit desc;" +
+                        "  generate  org.apache.pig.test.utils.AccumulativeSumBag(o1.fruit), " +
+                        "                  org.apache.pig.test.utils.AccumulativeSumBag(o2.fruit); " +
+                        "};");
+        
+        checkAccumulatorOff("C");        
+    }
+    
+    /**
+     * see PIG-1963.
+     * If there is a POSort or PODistinct still remaining in the plan 
+     * (after secondary sort optimization), accumulative mode can't 
+     * be used as they are blocking operators
+     * @throws IOException 
+     */
+    @Test
+    public void testAccumulatorOffOnDistinct() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit, category);");
+        pigServer.registerQuery("B = group A by id;");
+
+        pigServer.registerQuery("C = foreach B " +
+                        "{ " +
+                        "  o1 = order A by fruit;" +
+                        "  d2 = distinct A.category;" +
+                        "  generate  org.apache.pig.test.utils.AccumulativeSumBag(o1.fruit), " +
+                        "                  org.apache.pig.test.utils.AccumulativeSumBag(d2); " +
+                        "};");
+        
+        checkAccumulatorOff("C");        
+    }
+    
     @Test    
     public void testAccumulatorOff() throws IOException{
         pigServer.getPigContext().getProperties().setProperty("opt.accumulator", "false");
@@ -548,8 +594,14 @@ public class TestAccumulator extends Tes
         pigServer.registerQuery("B = group A by id;");
         pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulativeSumBag(A);");
         
+        checkAccumulatorOff("C");
+        pigServer.getPigContext().getProperties().setProperty("opt.accumulator", "true");
+        
+    }    
+    
+    private void checkAccumulatorOff(String alias) {
         try {
-            Iterator<Tuple> iter = pigServer.openIterator("C");
+            Iterator<Tuple> iter = pigServer.openIterator(alias);
             int c = 0;
             while(iter.hasNext()) {
                 iter.next();
@@ -559,8 +611,7 @@ public class TestAccumulator extends Tes
         }catch(Exception e) {
             // we should get exception
         }
-        
-    }    
+    }
     
     @Test    
     public void testAccumWithMap() throws IOException{
@@ -644,39 +695,5 @@ public class TestAccumulator extends Tes
         Util.checkQueryOutputsAfterSort(iter, expectedRes);
     } 
 
-    /**
-     * see PIG-1911 . 
-     * accumulator udf reading from a nested relational op. generate projects
-     * only the accumulator udf. using co-group
-     * @throws IOException
-     * @throws ParseException
-     */
-    @Test
-    public void testAccumAfterNestedOpCoGroup() throws IOException, ParseException{
-        // test group by
-        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
-        pigServer.registerQuery("B = load '" + INPUT_FILE + "' as (id:int, fruit);");
-        pigServer.registerQuery("C = cogroup A by id, B by id;");
-        pigServer.registerQuery("D = foreach C " +
-                "{ OA = order A by fruit;" +
-                "  FB = filter B by fruit != 'strawberry'; " +
-                "  generate" +
-                "     org.apache.pig.test.utils.AccumulatorBagCount(OA)," +
-                "     org.apache.pig.test.utils.AccumulativeSumBag(FB.fruit);" +
-        "}; ");                     
-
-        Iterator<Tuple> iter = pigServer.openIterator("D");
- 
-        List<Tuple> expectedRes = 
-            Util.getTuplesFromConstantTupleStrings(
-                    new String[] {
-                            "(2,'(apple)(apple)')",
-                            "(1,'(orange)')",
-                            "(3,'(pear)(pear)')",
-                            "(1,'(apple)')"
-                    });
-        Util.checkQueryOutputsAfterSort(iter, expectedRes);
-    }    
-    
 
 }