You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/08/25 06:46:57 UTC
svn commit: r1161380 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
test/org/apache/pig/test/TestEvalPipeline2.java
Author: daijy
Date: Thu Aug 25 04:46:56 2011
New Revision: 1161380
URL: http://svn.apache.org/viewvc?rev=1161380&view=rev
Log:
PIG-2231: Limit produce wrong number of records after foreach flatten
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1161380&r1=1161379&r2=1161380&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Aug 25 04:46:56 2011
@@ -116,6 +116,8 @@ PIG-2011: Speed up TestTypedMap.java (dv
BUG FIXES
+PIG-2231: Limit produce wrong number of records after foreach flatten (daijy)
+
PIG-2193: Using HBaseStorage to scan 2 tables in the same Map job produces bad data (rangadi via dvryaboy)
PIG-2232: "declare" document contains a typo (daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1161380&r1=1161379&r2=1161380&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Aug 25 04:46:56 2011
@@ -2901,18 +2901,22 @@ public class MRCompiler extends PhyPlanV
} else {
simpleConnectMapToReduce(limitAdjustMROp);
}
- POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pLimit2.setLimit(mr.limit);
- pLimit2.setLimitPlan(mr.limitPlan);
- limitAdjustMROp.reducePlan.addAsLeaf(pLimit2);
-
- // If the operator we're following has global sort set, we
- // need to indicate that this is a limit after a sort.
- // This will assure that we get the right sort comparator
- // set. Otherwise our order gets wacked (PIG-461).
+ // Need to split the original reduce plan into two mapreduce job:
+ // 1st: From the root(POPackage) to POLimit
+ // 2nd: From POLimit to leaves(POStore), duplicate POLimit
+ // The reason for doing that:
+ // 1. We need to have two map-reduce job, otherwise, we will end up with
+ // N*M records, N is number of reducer, M is limit constant. We need
+ // one extra mapreduce job with 1 reducer
+ // 2. We don't want to move operator after POLimit into the first mapreduce
+ // job, because:
+ // * Foreach will shift the key type for second mapreduce job, see PIG-461
+ // * Foreach flatten may generating more than M records, which get cut
+ // by POLimit, see PIG-2231
+ splitReducerForLimit(limitAdjustMROp, mr);
+
if (mr.isGlobalSort())
{
- fixProjectionAfterLimit(limitAdjustMROp, mr);
limitAdjustMROp.setLimitAfterSort(true);
limitAdjustMROp.setSortOrder(mr.getSortOrder());
}
@@ -2964,38 +2968,39 @@ public class MRCompiler extends PhyPlanV
}
// Move all operators between POLimit and POStore in reducer plan
- // from sortMROp to the new MROp so that the sort keys aren't lost by
- // projection in sortMROp.
- private void fixProjectionAfterLimit(MapReduceOper mro,
- MapReduceOper sortMROp) throws PlanException, VisitorException {
+ // from firstMROp to the secondMROp
+ private void splitReducerForLimit(MapReduceOper secondMROp,
+ MapReduceOper firstMROp) throws PlanException, VisitorException {
- PhysicalOperator op = sortMROp.reducePlan.getRoots().get(0);
+ PhysicalOperator op = firstMROp.reducePlan.getRoots().get(0);
assert(op instanceof POPackage);
- op = sortMROp.reducePlan.getSuccessors(op).get(0);
- assert(op instanceof POForEach);
-
while (true) {
- List<PhysicalOperator> succs = sortMROp.reducePlan
+ List<PhysicalOperator> succs = firstMROp.reducePlan
.getSuccessors(op);
if (succs==null) break;
op = succs.get(0);
if (op instanceof POLimit) {
// find operator after POLimit
- op = sortMROp.reducePlan.getSuccessors(op).get(0);
+ op = firstMROp.reducePlan.getSuccessors(op).get(0);
break;
}
}
+ POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ pLimit2.setLimit(firstMROp.limit);
+ pLimit2.setLimitPlan(firstMROp.limitPlan);
+ secondMROp.reducePlan.addAsLeaf(pLimit2);
+
while (true) {
if (op instanceof POStore) break;
PhysicalOperator opToMove = op;
- List<PhysicalOperator> succs = sortMROp.reducePlan
+ List<PhysicalOperator> succs = firstMROp.reducePlan
.getSuccessors(op);
op = succs.get(0);
- sortMROp.reducePlan.removeAndReconnect(opToMove);
- mro.reducePlan.addAsLeaf(opToMove);
+ firstMROp.reducePlan.removeAndReconnect(opToMove);
+ secondMROp.reducePlan.addAsLeaf(opToMove);
}
}
Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1161380&r1=1161379&r2=1161380&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Thu Aug 25 04:46:56 2011
@@ -1615,4 +1615,39 @@ public class TestEvalPipeline2 {
Assert.assertFalse(iter.hasNext());
}
+
+ // See PIG-2231
+ @Test
+ public void testLimitFlatten() throws Exception{
+ String[] input = {
+ "1\tA",
+ "1\tB",
+ "2\tC",
+ "3\tD",
+ "3\tE",
+ "3\tF"
+ };
+
+ Util.createInputFile(cluster, "table_testLimitFlatten", input);
+
+ pigServer.registerQuery("data = load 'table_testLimitFlatten' as (k,v);");
+ pigServer.registerQuery("grouped = GROUP data BY k;");
+ pigServer.registerQuery("selected = LIMIT grouped 2;");
+ pigServer.registerQuery("flattened = FOREACH selected GENERATE FLATTEN (data);");
+
+ Iterator<Tuple> iter = pigServer.openIterator("flattened");
+
+ Tuple t = iter.next();
+ Assert.assertTrue(t.toString().equals("(1,A)"));
+
+ t = iter.next();
+ Assert.assertTrue(t.toString().equals("(1,B)"));
+
+ Assert.assertTrue(iter.hasNext());
+
+ t = iter.next();
+ Assert.assertTrue(t.toString().equals("(2,C)"));
+
+ Assert.assertFalse(iter.hasNext());
+ }
}