You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/08/25 06:46:57 UTC

svn commit: r1161380 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java test/org/apache/pig/test/TestEvalPipeline2.java

Author: daijy
Date: Thu Aug 25 04:46:56 2011
New Revision: 1161380

URL: http://svn.apache.org/viewvc?rev=1161380&view=rev
Log:
PIG-2231: Limit produce wrong number of records after foreach flatten

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1161380&r1=1161379&r2=1161380&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Aug 25 04:46:56 2011
@@ -116,6 +116,8 @@ PIG-2011: Speed up TestTypedMap.java (dv
 
 BUG FIXES
 
+PIG-2231: Limit produce wrong number of records after foreach flatten (daijy)
+
 PIG-2193: Using HBaseStorage to scan 2 tables in the same Map job produces bad data (rangadi via dvryaboy)
 
 PIG-2232: "declare" document contains a typo (daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1161380&r1=1161379&r2=1161380&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Aug 25 04:46:56 2011
@@ -2901,18 +2901,22 @@ public class MRCompiler extends PhyPlanV
                 } else {
                     simpleConnectMapToReduce(limitAdjustMROp);
                 }
-                POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
-                pLimit2.setLimit(mr.limit);
-                pLimit2.setLimitPlan(mr.limitPlan);
-                limitAdjustMROp.reducePlan.addAsLeaf(pLimit2);
-
-                // If the operator we're following has global sort set, we
-                // need to indicate that this is a limit after a sort.
-                // This will assure that we get the right sort comparator
-                // set.  Otherwise our order gets wacked (PIG-461).
+                // Need to split the original reduce plan into two mapreduce job:
+                // 1st: From the root(POPackage) to POLimit
+                // 2nd: From POLimit to leaves(POStore), duplicate POLimit
+                // The reason for doing that:
+                // 1. We need to have two map-reduce job, otherwise, we will end up with
+                //    N*M records, N is number of reducer, M is limit constant. We need 
+                //    one extra mapreduce job with 1 reducer
+                // 2. We don't want to move operator after POLimit into the first mapreduce
+                //    job, because:
+                //    * Foreach will shift the key type for second mapreduce job, see PIG-461
+                //    * Foreach flatten may generating more than M records, which get cut 
+                //      by POLimit, see PIG-2231
+                splitReducerForLimit(limitAdjustMROp, mr);
+
                 if (mr.isGlobalSort()) 
                 {
-                    fixProjectionAfterLimit(limitAdjustMROp, mr);
                     limitAdjustMROp.setLimitAfterSort(true);
                     limitAdjustMROp.setSortOrder(mr.getSortOrder());
                 }
@@ -2964,38 +2968,39 @@ public class MRCompiler extends PhyPlanV
         }
         
         // Move all operators between POLimit and POStore in reducer plan 
-        // from sortMROp to the new MROp so that the sort keys aren't lost by 
-        // projection in sortMROp.
-        private void fixProjectionAfterLimit(MapReduceOper mro,
-                MapReduceOper sortMROp) throws PlanException, VisitorException {
+        // from firstMROp to the secondMROp
+        private void splitReducerForLimit(MapReduceOper secondMROp,
+                MapReduceOper firstMROp) throws PlanException, VisitorException {
                         
-            PhysicalOperator op = sortMROp.reducePlan.getRoots().get(0);
+            PhysicalOperator op = firstMROp.reducePlan.getRoots().get(0);
             assert(op instanceof POPackage);
             
-            op = sortMROp.reducePlan.getSuccessors(op).get(0);
-            assert(op instanceof POForEach);
-            
             while (true) {
-                List<PhysicalOperator> succs = sortMROp.reducePlan
+                List<PhysicalOperator> succs = firstMROp.reducePlan
                         .getSuccessors(op);
                 if (succs==null) break;
                 op = succs.get(0);
                 if (op instanceof POLimit) {
                     // find operator after POLimit
-                    op = sortMROp.reducePlan.getSuccessors(op).get(0);
+                    op = firstMROp.reducePlan.getSuccessors(op).get(0);
                     break;
                 }
             }
             
+            POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
+            pLimit2.setLimit(firstMROp.limit);
+            pLimit2.setLimitPlan(firstMROp.limitPlan);
+            secondMROp.reducePlan.addAsLeaf(pLimit2);
+
             while (true) {
                 if (op instanceof POStore) break;
                 PhysicalOperator opToMove = op;
-                List<PhysicalOperator> succs = sortMROp.reducePlan
+                List<PhysicalOperator> succs = firstMROp.reducePlan
                         .getSuccessors(op);
                 op = succs.get(0);
                 
-                sortMROp.reducePlan.removeAndReconnect(opToMove);
-                mro.reducePlan.addAsLeaf(opToMove);
+                firstMROp.reducePlan.removeAndReconnect(opToMove);
+                secondMROp.reducePlan.addAsLeaf(opToMove);
                 
             }
         }

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1161380&r1=1161379&r2=1161380&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Thu Aug 25 04:46:56 2011
@@ -1615,4 +1615,39 @@ public class TestEvalPipeline2 {
         
         Assert.assertFalse(iter.hasNext());
     }
+    
+    // See PIG-2231
+    @Test
+    public void testLimitFlatten() throws Exception{
+        String[] input = {
+                "1\tA",
+                "1\tB",
+                "2\tC",
+                "3\tD",
+                "3\tE",
+                "3\tF"
+        };
+        
+        Util.createInputFile(cluster, "table_testLimitFlatten", input);
+        
+        pigServer.registerQuery("data = load 'table_testLimitFlatten' as (k,v);");
+        pigServer.registerQuery("grouped = GROUP data BY k;");
+        pigServer.registerQuery("selected = LIMIT grouped 2;");
+        pigServer.registerQuery("flattened = FOREACH selected GENERATE FLATTEN (data);");
+
+        Iterator<Tuple> iter = pigServer.openIterator("flattened");
+        
+        Tuple t = iter.next();
+        Assert.assertTrue(t.toString().equals("(1,A)"));
+        
+        t = iter.next();
+        Assert.assertTrue(t.toString().equals("(1,B)"));
+        
+        Assert.assertTrue(iter.hasNext());
+        
+        t = iter.next();
+        Assert.assertTrue(t.toString().equals("(2,C)"));
+        
+        Assert.assertFalse(iter.hasNext());
+    }
 }