You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/09/23 20:17:24 UTC
svn commit: r1174935 - in /pig/branches/branch-0.9: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
test/org/apache/pig/test/
Author: daijy
Date: Fri Sep 23 18:17:24 2011
New Revision: 1174935
URL: http://svn.apache.org/viewvc?rev=1174935&view=rev
Log:
PIG-2237: LIMIT generates wrong number of records if pig determines no of reducers as more than 1
Added:
pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LimitAdjuster.java
pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java
Modified:
pig/branches/branch-0.9/CHANGES.txt
pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
pig/branches/branch-0.9/test/org/apache/pig/test/TestEvalPipeline2.java
pig/branches/branch-0.9/test/org/apache/pig/test/TestMRCompiler.java
Modified: pig/branches/branch-0.9/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/CHANGES.txt?rev=1174935&r1=1174934&r2=1174935&view=diff
==============================================================================
--- pig/branches/branch-0.9/CHANGES.txt (original)
+++ pig/branches/branch-0.9/CHANGES.txt Fri Sep 23 18:17:24 2011
@@ -38,6 +38,8 @@ PIG-2221: Couldnt find documentation for
BUG FIXES
+PIG-2237: LIMIT generates wrong number of records if pig determines no of reducers as more than 1 (daijy)
+
PIG-2261: Restore support for parenthesis in Pig 0.9 (rding via daijy)
PIG-2238: Pig 0.9 error message not useful as compared to 0.8 (daijy)
Added: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LimitAdjuster.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LimitAdjuster.java?rev=1174935&view=auto
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LimitAdjuster.java (added)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LimitAdjuster.java Fri Sep 23 18:17:24 2011
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Utils;
+
+public class LimitAdjuster extends MROpPlanVisitor {
+ ArrayList<MapReduceOper> opsToAdjust = new ArrayList<MapReduceOper>();
+ PigContext pigContext;
+ NodeIdGenerator nig;
+ private String scope;
+
+
+ public LimitAdjuster(MROperPlan plan, PigContext pigContext) {
+ super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+ this.pigContext = pigContext;
+ nig = NodeIdGenerator.getGenerator();
+ List<MapReduceOper> roots = plan.getRoots();
+ scope = roots.get(0).getOperatorKey().getScope();
+ }
+
+ @Override
+ public void visitMROp(MapReduceOper mr) throws VisitorException {
+ // Look for map reduce operators which contains limit operator.
+ // If so and the requestedParallelism > 1, add one additional map-reduce
+ // operator with 1 reducer into the original plan
+ if (mr.limit!=-1 && mr.requestedParallelism!=1)
+ {
+ opsToAdjust.add(mr);
+ }
+ }
+
+ public void adjust() throws IOException, PlanException
+ {
+ for (MapReduceOper mr:opsToAdjust)
+ {
+ if (mr.reducePlan.isEmpty()) continue;
+ List<PhysicalOperator> mpLeaves = mr.reducePlan.getLeaves();
+ if (mpLeaves.size() != 1) {
+ int errCode = 2024;
+ String msg = "Expected reduce to have single leaf. Found " + mpLeaves.size() + " leaves.";
+ throw new MRCompilerException(msg, errCode, PigException.BUG);
+ }
+ PhysicalOperator mpLeaf = mpLeaves.get(0);
+ if (!pigContext.inIllustrator) {
+ if (!(mpLeaf instanceof POStore)) {
+ int errCode = 2025;
+ String msg = "Expected leaf of reduce plan to " +
+ "always be POStore. Found " + mpLeaf.getClass().getSimpleName();
+ throw new MRCompilerException(msg, errCode, PigException.BUG);
+ }
+ }
+ FileSpec oldSpec = ((POStore)mpLeaf).getSFile();
+ boolean oldIsTmpStore = ((POStore)mpLeaf).isTmpStore();
+
+ FileSpec fSpec = new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
+ new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
+ POStore storeOp = (POStore) mpLeaf;
+ storeOp.setSFile(fSpec);
+ storeOp.setIsTmpStore(true);
+ mr.setReduceDone(true);
+ MapReduceOper limitAdjustMROp = new MapReduceOper(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ POLoad ld = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ ld.setPc(pigContext);
+ ld.setLFile(fSpec);
+ limitAdjustMROp.mapPlan.add(ld);
+ if (mr.isGlobalSort()) {
+ connectMapToReduceLimitedSort(limitAdjustMROp, mr);
+ } else {
+ MRUtil.simpleConnectMapToReduce(limitAdjustMROp, scope, nig);
+ }
+ // Need to split the original reduce plan into two mapreduce job:
+ // 1st: From the root(POPackage) to POLimit
+ // 2nd: From POLimit to leaves(POStore), duplicate POLimit
+ // The reason for doing that:
+ // 1. We need to have two map-reduce job, otherwise, we will end up with
+ // N*M records, N is number of reducer, M is limit constant. We need
+ // one extra mapreduce job with 1 reducer
+ // 2. We don't want to move operator after POLimit into the first mapreduce
+ // job, because:
+ // * Foreach will shift the key type for second mapreduce job, see PIG-461
+ // * Foreach flatten may generating more than M records, which get cut
+ // by POLimit, see PIG-2231
+ splitReducerForLimit(limitAdjustMROp, mr);
+
+ if (mr.isGlobalSort())
+ {
+ limitAdjustMROp.setLimitAfterSort(true);
+ limitAdjustMROp.setSortOrder(mr.getSortOrder());
+ }
+
+ POStore st = new POStore(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ st.setSFile(oldSpec);
+ st.setIsTmpStore(oldIsTmpStore);
+ st.setSchema(((POStore)mpLeaf).getSchema());
+ limitAdjustMROp.reducePlan.addAsLeaf(st);
+ limitAdjustMROp.requestedParallelism = 1;
+ limitAdjustMROp.setLimitOnly(true);
+
+ List<MapReduceOper> successorList = mPlan.getSuccessors(mr);
+ MapReduceOper successors[] = null;
+
+ // Save a snapshot for successors, since we will modify MRPlan,
+ // use the list directly will be problematic
+ if (successorList!=null && successorList.size()>0)
+ {
+ successors = new MapReduceOper[successorList.size()];
+ int i=0;
+ for (MapReduceOper op:successorList)
+ successors[i++] = op;
+ }
+
+ // Process UDFs
+ for (String udf : mr.UDFs) {
+ if (!limitAdjustMROp.UDFs.contains(udf)) {
+ limitAdjustMROp.UDFs.add(udf);
+ }
+ }
+
+ mPlan.add(limitAdjustMROp);
+ mPlan.connect(mr, limitAdjustMROp);
+
+ if (successors!=null)
+ {
+ for (int i=0;i<successors.length;i++)
+ {
+ MapReduceOper nextMr = successors[i];
+ if (nextMr!=null)
+ mPlan.disconnect(mr, nextMr);
+
+ if (nextMr!=null)
+ mPlan.connect(limitAdjustMROp, nextMr);
+ }
+ }
+ }
+ }
+
+ // Move all operators between POLimit and POStore in reducer plan
+ // from firstMROp to the secondMROp
+ private void splitReducerForLimit(MapReduceOper secondMROp,
+ MapReduceOper firstMROp) throws PlanException, VisitorException {
+
+ PhysicalOperator op = firstMROp.reducePlan.getRoots().get(0);
+ assert(op instanceof POPackage);
+
+ while (true) {
+ List<PhysicalOperator> succs = firstMROp.reducePlan
+ .getSuccessors(op);
+ if (succs==null) break;
+ op = succs.get(0);
+ if (op instanceof POLimit) {
+ // find operator after POLimit
+ op = firstMROp.reducePlan.getSuccessors(op).get(0);
+ break;
+ }
+ }
+
+ POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ pLimit2.setLimit(firstMROp.limit);
+ secondMROp.reducePlan.addAsLeaf(pLimit2);
+
+ while (true) {
+ if (op instanceof POStore) break;
+ PhysicalOperator opToMove = op;
+ List<PhysicalOperator> succs = firstMROp.reducePlan
+ .getSuccessors(op);
+ op = succs.get(0);
+
+ firstMROp.reducePlan.removeAndReconnect(opToMove);
+ secondMROp.reducePlan.addAsLeaf(opToMove);
+
+ }
+ }
+
+ private void connectMapToReduceLimitedSort(MapReduceOper mro, MapReduceOper sortMROp) throws PlanException, VisitorException
+ {
+ POLocalRearrange slr = (POLocalRearrange)sortMROp.mapPlan.getLeaves().get(0);
+
+ POLocalRearrange lr = null;
+ try {
+ lr = slr.clone();
+ } catch (CloneNotSupportedException e) {
+ int errCode = 2147;
+ String msg = "Error cloning POLocalRearrange for limit after sort";
+ throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+ }
+
+ mro.mapPlan.addAsLeaf(lr);
+
+ POPackage spkg = (POPackage)sortMROp.reducePlan.getRoots().get(0);
+
+ POPackage pkg = null;
+ try {
+ pkg = spkg.clone();
+ } catch (Exception e) {
+ int errCode = 2148;
+ String msg = "Error cloning POPackageLite for limit after sort";
+ throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ mro.reducePlan.add(pkg);
+ mro.reducePlan.addAsLeaf(MRUtil.getPlainForEachOP(scope, nig));
+ }
+}
\ No newline at end of file
Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1174935&r1=1174934&r2=1174935&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Sep 23 18:17:24 2011
@@ -342,10 +342,6 @@ public class MRCompiler extends PhyPlanV
connectSoftLink();
- LimitAdjuster la = new LimitAdjuster(MRPlan);
- la.visit();
- la.adjust();
-
return MRPlan;
}
@@ -938,89 +934,6 @@ public class MRCompiler extends PhyPlanV
}
}
- private void connectMapToReduceLimitedSort(MapReduceOper mro, MapReduceOper sortMROp) throws PlanException, VisitorException
- {
- POLocalRearrange slr = (POLocalRearrange)sortMROp.mapPlan.getLeaves().get(0);
-
- POLocalRearrange lr = null;
- try {
- lr = slr.clone();
- } catch (CloneNotSupportedException e) {
- int errCode = 2147;
- String msg = "Error cloning POLocalRearrange for limit after sort";
- throw new MRCompilerException(msg, errCode, PigException.BUG, e);
- }
-
- mro.mapPlan.addAsLeaf(lr);
-
- POPackage spkg = (POPackage)sortMROp.reducePlan.getRoots().get(0);
-
- POPackage pkg = null;
- try {
- pkg = spkg.clone();
- } catch (Exception e) {
- int errCode = 2148;
- String msg = "Error cloning POPackageLite for limit after sort";
- throw new MRCompilerException(msg, errCode, PigException.BUG, e);
- }
- mro.reducePlan.add(pkg);
- mro.reducePlan.addAsLeaf(getPlainForEachOP());
- }
-
- private void simpleConnectMapToReduce(MapReduceOper mro) throws PlanException
- {
- PhysicalPlan ep = new PhysicalPlan();
- POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- prjStar.setResultType(DataType.TUPLE);
- prjStar.setStar(true);
- ep.add(prjStar);
-
- List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
- eps.add(ep);
-
- POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
- try {
- lr.setIndex(0);
- } catch (ExecException e) {
- int errCode = 2058;
- String msg = "Unable to set index on the newly created POLocalRearrange.";
- throw new PlanException(msg, errCode, PigException.BUG, e);
- }
- lr.setKeyType(DataType.TUPLE);
- lr.setPlans(eps);
- lr.setResultType(DataType.TUPLE);
-
- mro.mapPlan.addAsLeaf(lr);
-
- POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pkg.setKeyType(DataType.TUPLE);
- pkg.setNumInps(1);
- boolean[] inner = {false};
- pkg.setInner(inner);
- mro.reducePlan.add(pkg);
-
- mro.reducePlan.addAsLeaf(getPlainForEachOP());
- }
-
- private POForEach getPlainForEachOP()
- {
- List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
- List<Boolean> flat1 = new ArrayList<Boolean>();
- PhysicalPlan ep1 = new PhysicalPlan();
- POProject prj1 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- prj1.setResultType(DataType.TUPLE);
- prj1.setStar(false);
- prj1.setColumn(1);
- prj1.setOverloaded(true);
- ep1.add(prj1);
- eps1.add(ep1);
- flat1.add(true);
- POForEach fe = new POForEach(new OperatorKey(scope, nig
- .getNextNodeId(scope)), -1, eps1, flat1);
- fe.setResultType(DataType.BAG);
- return fe;
- }
-
@Override
public void visitLimit(POLimit op) throws VisitorException{
try{
@@ -1038,7 +951,7 @@ public class MRCompiler extends PhyPlanV
if (mro.reducePlan.isEmpty())
{
- simpleConnectMapToReduce(mro);
+ MRUtil.simpleConnectMapToReduce(mro, scope, nig);
mro.requestedParallelism = 1;
if (!pigContext.inIllustrator) {
POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
@@ -1557,7 +1470,7 @@ public class MRCompiler extends PhyPlanV
// After getting an index entry in each mapper, send all of them to one
// reducer where they will be sorted on the way by Hadoop.
- simpleConnectMapToReduce(indexerMROp);
+ MRUtil.simpleConnectMapToReduce(indexerMROp, scope, nig);
indexerMROp.requestedParallelism = 1; // we need exactly one reducer for indexing job.
@@ -1762,7 +1675,7 @@ public class MRCompiler extends PhyPlanV
// Loader of mro will return a tuple of form -
// (keyFirst1, keyFirst2, .. , position, splitIndex) See MergeJoinIndexer
- simpleConnectMapToReduce(rightMROpr);
+ MRUtil.simpleConnectMapToReduce(rightMROpr, scope, nig);
rightMROpr.useTypedComparator(true);
POStore st = getStore();
@@ -2831,164 +2744,6 @@ public class MRCompiler extends PhyPlanV
}
- private class LimitAdjuster extends MROpPlanVisitor {
- ArrayList<MapReduceOper> opsToAdjust = new ArrayList<MapReduceOper>();
-
- LimitAdjuster(MROperPlan plan) {
- super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
- }
-
- @Override
- public void visitMROp(MapReduceOper mr) throws VisitorException {
- // Look for map reduce operators which contains limit operator.
- // If so and the requestedParallelism > 1, add one additional map-reduce
- // operator with 1 reducer into the original plan
- if (mr.limit!=-1 && mr.requestedParallelism!=1)
- {
- opsToAdjust.add(mr);
- }
- }
-
- public void adjust() throws IOException, PlanException
- {
- for (MapReduceOper mr:opsToAdjust)
- {
- if (mr.reducePlan.isEmpty()) continue;
- List<PhysicalOperator> mpLeaves = mr.reducePlan.getLeaves();
- if (mpLeaves.size() != 1) {
- int errCode = 2024;
- String msg = "Expected reduce to have single leaf. Found " + mpLeaves.size() + " leaves.";
- throw new MRCompilerException(msg, errCode, PigException.BUG);
- }
- PhysicalOperator mpLeaf = mpLeaves.get(0);
- if (!pigContext.inIllustrator)
- if (!(mpLeaf instanceof POStore)) {
- int errCode = 2025;
- String msg = "Expected leaf of reduce plan to " +
- "always be POStore. Found " + mpLeaf.getClass().getSimpleName();
- throw new MRCompilerException(msg, errCode, PigException.BUG);
- }
- FileSpec oldSpec = ((POStore)mpLeaf).getSFile();
- boolean oldIsTmpStore = ((POStore)mpLeaf).isTmpStore();
-
- FileSpec fSpec = getTempFileSpec();
- ((POStore)mpLeaf).setSFile(fSpec);
- ((POStore)mpLeaf).setIsTmpStore(true);
- mr.setReduceDone(true);
- MapReduceOper limitAdjustMROp = getMROp();
- POLoad ld = getLoad();
- ld.setLFile(fSpec);
- limitAdjustMROp.mapPlan.add(ld);
- if (mr.isGlobalSort()) {
- connectMapToReduceLimitedSort(limitAdjustMROp, mr);
- } else {
- simpleConnectMapToReduce(limitAdjustMROp);
- }
-
- // Need to split the original reduce plan into two mapreduce job:
- // 1st: From the root(POPackage) to POLimit
- // 2nd: From POLimit to leaves(POStore), duplicate POLimit
- // The reason for doing that:
- // 1. We need to have two map-reduce job, otherwise, we will end up with
- // N*M records, N is number of reducer, M is limit constant. We need
- // one extra mapreduce job with 1 reducer
- // 2. We don't want to move operator after POLimit into the first mapreduce
- // job, because:
- // * Foreach will shift the key type for second mapreduce job, see PIG-461
- // * Foreach flatten may generating more than M records, which get cut
- // by POLimit, see PIG-2231
- splitReducerForLimit(limitAdjustMROp, mr);
-
- if (mr.isGlobalSort())
- {
- limitAdjustMROp.setLimitAfterSort(true);
- limitAdjustMROp.setSortOrder(mr.getSortOrder());
- }
-
- POStore st = getStore();
- st.setSFile(oldSpec);
- st.setIsTmpStore(oldIsTmpStore);
- st.setSchema(((POStore)mpLeaf).getSchema());
- limitAdjustMROp.reducePlan.addAsLeaf(st);
- limitAdjustMROp.requestedParallelism = 1;
- limitAdjustMROp.setLimitOnly(true);
-
- List<MapReduceOper> successorList = MRPlan.getSuccessors(mr);
- MapReduceOper successors[] = null;
-
- // Save a snapshot for successors, since we will modify MRPlan,
- // use the list directly will be problematic
- if (successorList!=null && successorList.size()>0)
- {
- successors = new MapReduceOper[successorList.size()];
- int i=0;
- for (MapReduceOper op:successorList)
- successors[i++] = op;
- }
-
- // Process UDFs
- for (String udf : mr.UDFs) {
- if (!limitAdjustMROp.UDFs.contains(udf)) {
- limitAdjustMROp.UDFs.add(udf);
- }
- }
-
- MRPlan.add(limitAdjustMROp);
- MRPlan.connect(mr, limitAdjustMROp);
-
- if (successors!=null)
- {
- for (int i=0;i<successors.length;i++)
- {
- MapReduceOper nextMr = successors[i];
- if (nextMr!=null)
- MRPlan.disconnect(mr, nextMr);
-
- if (nextMr!=null)
- MRPlan.connect(limitAdjustMROp, nextMr);
- }
- }
- }
- }
-
- // Move all operators between POLimit and POStore in reducer plan
- // from firstMROp to the secondMROp
- private void splitReducerForLimit(MapReduceOper secondMROp,
- MapReduceOper firstMROp) throws PlanException, VisitorException {
-
- PhysicalOperator op = firstMROp.reducePlan.getRoots().get(0);
- assert(op instanceof POPackage);
-
- while (true) {
- List<PhysicalOperator> succs = firstMROp.reducePlan
- .getSuccessors(op);
- if (succs==null) break;
- op = succs.get(0);
- if (op instanceof POLimit) {
- // find operator after POLimit
- op = firstMROp.reducePlan.getSuccessors(op).get(0);
- break;
- }
- }
-
- POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pLimit2.setLimit(firstMROp.limit);
- secondMROp.reducePlan.addAsLeaf(pLimit2);
-
- while (true) {
- if (op instanceof POStore) break;
- PhysicalOperator opToMove = op;
- List<PhysicalOperator> succs = firstMROp.reducePlan
- .getSuccessors(op);
- op = succs.get(0);
-
- firstMROp.reducePlan.removeAndReconnect(opToMove);
- secondMROp.reducePlan.addAsLeaf(opToMove);
-
- }
- }
- }
-
private static class FindKeyTypeVisitor extends PhyPlanVisitor {
byte keyType = DataType.UNKNOWN;
Added: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java?rev=1174935&view=auto
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java (added)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java Fri Sep 23 18:17:24 2011
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+
+public class MRUtil {
+ // simpleConnectMapToReduce is a utility to end a map phase and start a reduce phase in
+ // a mapreduce operator:
+ // 1. mro only contains map plan
+ // 2. need to add POLocalRearrange to end map plan, and add
+ // POPackage to start a reduce plan
+ // 3. POLocalRearrange/POPackage are trivial
+ static public void simpleConnectMapToReduce(MapReduceOper mro, String scope, NodeIdGenerator nig) throws PlanException
+ {
+ PhysicalPlan ep = new PhysicalPlan();
+ POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ prjStar.setResultType(DataType.TUPLE);
+ prjStar.setStar(true);
+ ep.add(prjStar);
+
+ List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+ eps.add(ep);
+
+ POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ try {
+ lr.setIndex(0);
+ } catch (ExecException e) {
+ int errCode = 2058;
+ String msg = "Unable to set index on the newly created POLocalRearrange.";
+ throw new PlanException(msg, errCode, PigException.BUG, e);
+ }
+ lr.setKeyType(DataType.TUPLE);
+ lr.setPlans(eps);
+ lr.setResultType(DataType.TUPLE);
+
+ mro.mapPlan.addAsLeaf(lr);
+
+ POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ pkg.setKeyType(DataType.TUPLE);
+ pkg.setNumInps(1);
+ boolean[] inner = {false};
+ pkg.setInner(inner);
+ mro.reducePlan.add(pkg);
+
+ mro.reducePlan.addAsLeaf(getPlainForEachOP(scope, nig));
+ }
+
+ // Get a simple POForEach: ForEach X generate flatten($1)
+ static public POForEach getPlainForEachOP(String scope, NodeIdGenerator nig)
+ {
+ List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
+ List<Boolean> flat1 = new ArrayList<Boolean>();
+ PhysicalPlan ep1 = new PhysicalPlan();
+ POProject prj1 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ prj1.setResultType(DataType.TUPLE);
+ prj1.setStar(false);
+ prj1.setColumn(1);
+ prj1.setOverloaded(true);
+ ep1.add(prj1);
+ eps1.add(ep1);
+ flat1.add(true);
+ POForEach fe = new POForEach(new OperatorKey(scope, nig
+ .getNextNodeId(scope)), -1, eps1, flat1);
+ fe.setResultType(DataType.BAG);
+ return fe;
+ }
+}
Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1174935&r1=1174934&r2=1174935&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Sep 23 18:17:24 2011
@@ -501,6 +501,10 @@ public class MapReduceLauncher extends L
SampleOptimizer so = new SampleOptimizer(plan, pc);
so.visit();
+ LimitAdjuster la = new LimitAdjuster(plan, pc);
+ la.visit();
+ la.adjust();
+
// Optimize to use secondary sort key if possible
prop = pc.getProperties().getProperty("pig.exec.nosecondarykey");
if (!pc.inIllustrator && !("true".equals(prop))) {
Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1174935&r1=1174934&r2=1174935&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestEvalPipeline2.java Fri Sep 23 18:17:24 2011
@@ -1620,4 +1620,34 @@ public class TestEvalPipeline2 {
Assert.assertFalse(iter.hasNext());
}
+
+ // See PIG-2237
+ @Test
+ public void testLimitAutoReducer() throws Exception{
+ String[] input = {
+ "1\tA",
+ "4\tB",
+ "2\tC",
+ "3\tD",
+ "6\tE",
+ "5\tF"
+ };
+
+ Util.createInputFile(cluster, "table_testLimitAutoReducer", input);
+
+ pigServer.getPigContext().getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "9");
+ pigServer.registerQuery("A = load 'table_testLimitAutoReducer' as (a0, a1);");
+ pigServer.registerQuery("B = order A by a0;");
+ pigServer.registerQuery("C = limit B 2;");
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ Tuple t = iter.next();
+ Assert.assertTrue(t.toString().equals("(1,A)"));
+
+ t = iter.next();
+ Assert.assertTrue(t.toString().equals("(2,C)"));
+
+ Assert.assertFalse(iter.hasNext());
+ }
}
Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestMRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestMRCompiler.java?rev=1174935&r1=1174934&r2=1174935&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestMRCompiler.java Fri Sep 23 18:17:24 2011
@@ -44,6 +44,7 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.GFCross;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LimitAdjuster;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
@@ -901,6 +902,11 @@ public class TestMRCompiler extends juni
PhysicalPlan pp = Util.buildPp(pigServerMR, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ LimitAdjuster la = new LimitAdjuster(mrPlan, pc);
+ la.visit();
+ la.adjust();
+
MapReduceOper mrOper = mrPlan.getRoots().get(0);
int count = 1;
@@ -995,6 +1001,11 @@ public class TestMRCompiler extends juni
PhysicalPlan pp = Util.buildPp(pigServerMR, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ LimitAdjuster la = new LimitAdjuster(mrPlan, pc);
+ la.visit();
+ la.adjust();
+
MapReduceOper mrOper = mrPlan.getRoots().get(0);
int count = 1;