You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/19 21:09:35 UTC
svn commit: r697190 - in /incubator/pig/branches/types: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
test/org/apache/pig/test/data/GoldenFiles/
Author: olga
Date: Fri Sep 19 12:09:35 2008
New Revision: 697190
URL: http://svn.apache.org/viewvc?rev=697190&view=rev
Log:
PIG-364: limit fix
Modified:
incubator/pig/branches/types/CHANGES.txt
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld
Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=697190&r1=697189&r2=697190&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Fri Sep 19 12:09:35 2008
@@ -210,4 +210,7 @@
PIG-436: alias is lost when single column is flattened (pradeepk via
olgan)
+ PIG-364: Limit return incorrect records when we use multiple reducer
+ (daijy via olgan)
+
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=697190&r1=697189&r2=697190&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Sep 19 12:09:35 2008
@@ -213,6 +213,10 @@
RearrangeAdjuster ra = new RearrangeAdjuster(MRPlan);
ra.visit();
+ LimitAdjuster la = new LimitAdjuster(MRPlan);
+ la.visit();
+ la.adjust();
+
return MRPlan;
}
@@ -634,10 +638,55 @@
}
}
+ public void simpleConnectMapToReduce(MapReduceOper mro) throws PlanException
+ {
+ PhysicalPlan ep = new PhysicalPlan();
+ POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ prjStar.setResultType(DataType.TUPLE);
+ prjStar.setStar(true);
+ ep.add(prjStar);
+
+ List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+ eps.add(ep);
+
+ POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ lr.setIndex(0);
+ lr.setKeyType(DataType.TUPLE);
+ lr.setPlans(eps);
+ lr.setResultType(DataType.TUPLE);
+
+ mro.mapPlan.addAsLeaf(lr);
+
+ POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ pkg.setKeyType(DataType.TUPLE);
+ pkg.setNumInps(1);
+ boolean[] inner = {false};
+ pkg.setInner(inner);
+ mro.reducePlan.add(pkg);
+
+ List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
+ List<Boolean> flat1 = new ArrayList<Boolean>();
+ PhysicalPlan ep1 = new PhysicalPlan();
+ POProject prj1 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ prj1.setResultType(DataType.TUPLE);
+ prj1.setStar(false);
+ prj1.setColumn(1);
+ prj1.setOverloaded(true);
+ ep1.add(prj1);
+ eps1.add(ep1);
+ flat1.add(true);
+ POForEach nfe1 = new POForEach(new OperatorKey(scope, nig
+ .getNextNodeId(scope)), -1, eps1, flat1);
+ nfe1.setResultType(DataType.BAG);
+
+ mro.reducePlan.addAsLeaf(nfe1);
+ }
+
public void visitLimit(POLimit op) throws VisitorException{
try{
MapReduceOper mro = compiledInputs[0];
+ mro.limit = op.getLimit();
if (!mro.isMapDone()) {
// if map plan is open, add a limit for optimization, eventually we
// will add another limit to reduce plan
@@ -646,53 +695,15 @@
if (mro.reducePlan.isEmpty())
{
- PhysicalPlan ep = new PhysicalPlan();
- POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- prjStar.setResultType(DataType.TUPLE);
- prjStar.setStar(true);
- ep.add(prjStar);
-
- List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
- eps.add(ep);
-
- POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
- lr.setIndex(0);
- lr.setKeyType(DataType.TUPLE);
- lr.setPlans(eps);
- lr.setResultType(DataType.TUPLE);
- mro.mapPlan.addAsLeaf(lr);
-
- POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pkg.setKeyType(DataType.TUPLE);
- pkg.setNumInps(1);
- boolean[] inner = {false};
- pkg.setInner(inner);
- mro.reducePlan.add(pkg);
-
+ simpleConnectMapToReduce(mro);
+ mro.requestedParallelism = -1;
POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pLimit2.setLimit(op.getLimit());
+ pLimit2.setLimit(op.getLimit());
mro.reducePlan.addAsLeaf(pLimit2);
-
- List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
- List<Boolean> flat1 = new ArrayList<Boolean>();
- PhysicalPlan ep1 = new PhysicalPlan();
- POProject prj1 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- prj1.setResultType(DataType.TUPLE);
- prj1.setStar(false);
- prj1.setColumn(1);
- prj1.setOverloaded(true);
- ep1.add(prj1);
- eps1.add(ep1);
- flat1.add(true);
- POForEach nfe1 = new POForEach(new OperatorKey(scope, nig
- .getNextNodeId(scope)), op.getRequestedParallelism(), eps1,
- flat1);
- nfe1.setResultType(DataType.BAG);
- curMROp.reducePlan.addAsLeaf(nfe1);
}
else
{
- log.warn("Something in the reduce plan while map plan is not done. Something wrong!");
+ log.warn("Something in the reduce plan while map plan is not done. Something wrong!");
}
} else if (mro.isMapDone() && !mro.isReduceDone()) {
// limit should add into reduce plan
@@ -700,7 +711,6 @@
} else {
log.warn("Both map and reduce phases have been done. This is unexpected while compiling!");
}
- curMROp = mro;
}catch(Exception e){
VisitorException pe = new VisitorException(e.getMessage());
pe.initCause(e);
@@ -837,6 +847,7 @@
int[] fields = getSortCols(op);
MapReduceOper quant = getQuantileJob(op, mro, fSpec, quantFile, rp, fields);
curMROp = getSortJob(op, quant, fSpec, quantFile, rp, fields);
+
if(op.isUDFComparatorUsed){
curMROp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
}
@@ -874,6 +885,7 @@
mro.requestedParallelism = rp;
long limit = sort.getLimit();
+ mro.limit = limit;
List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
@@ -963,24 +975,12 @@
fe_c1.setResultType(DataType.TUPLE);
mro.combinePlan.addAsLeaf(fe_c1);
-
POLimit pLimit = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
pLimit.setLimit(limit);
mro.combinePlan.addAsLeaf(pLimit);
List<PhysicalPlan> eps_c2 = new ArrayList<PhysicalPlan>();
eps_c2.addAll(sort.getSortPlans());
- /*
- for (int i : fields) {
- PhysicalPlan ep_c2 = new PhysicalPlan();
- POProject prj_c2 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- prj_c2.setColumn(i);
- prj_c2.setOverloaded(false);
- prj_c2.setResultType(DataType.BYTEARRAY);
- ep_c2.add(prj_c2);
- eps_c2.add(ep_c2);
- }
- */
POLocalRearrange lr_c2 = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
lr_c2.setIndex(0);
@@ -1240,6 +1240,95 @@
}
}
+ private class LimitAdjuster extends MROpPlanVisitor {
+ ArrayList<MapReduceOper> opsToAdjust = new ArrayList<MapReduceOper>();
+
+ LimitAdjuster(MROperPlan plan) {
+ super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+ }
+
+ @Override
+ public void visitMROp(MapReduceOper mr) throws VisitorException {
+ // Look for map reduce operators which contains limit operator.
+ // If so and the requestedParallelism > 1, add one additional map-reduce
+ // operator with 1 reducer into the original plan
+ if (mr.limit!=-1 && mr.requestedParallelism>1)
+ {
+ opsToAdjust.add(mr);
+ }
+ }
+
+ public void adjust() throws IOException, PlanException
+ {
+ for (MapReduceOper mr:opsToAdjust)
+ {
+ if (mr.reducePlan.isEmpty()) return;
+ List<PhysicalOperator> mpLeaves = mr.reducePlan.getLeaves();
+ if (mpLeaves.size() != 1) {
+ String msg = new String("Expected reduce to have single leaf");
+ log.error(msg);
+ throw new IOException(msg);
+ }
+ PhysicalOperator mpLeaf = mpLeaves.get(0);
+ if (!(mpLeaf instanceof POStore)) {
+ String msg = new String("Expected leaf of reduce plan to " +
+ "always be POStore!");
+ log.error(msg);
+ throw new IOException(msg);
+ }
+ FileSpec oldSpec = ((POStore)mpLeaf).getSFile();
+
+ FileSpec fSpec = getTempFileSpec();
+ ((POStore)mpLeaf).setSFile(fSpec);
+ mr.setReduceDone(true);
+ MapReduceOper limitAdjustMROp = getMROp();
+ POLoad ld = getLoad();
+ ld.setLFile(fSpec);
+ limitAdjustMROp.mapPlan.add(ld);
+ POLimit pLimit = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ pLimit.setLimit(mr.limit);
+ limitAdjustMROp.mapPlan.addAsLeaf(pLimit);
+ simpleConnectMapToReduce(limitAdjustMROp);
+ POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ pLimit2.setLimit(mr.limit);
+ limitAdjustMROp.reducePlan.addAsLeaf(pLimit2);
+ POStore st = getStore();
+ st.setSFile(oldSpec);
+ limitAdjustMROp.reducePlan.addAsLeaf(st);
+ limitAdjustMROp.requestedParallelism = -1;
+
+ List<MapReduceOper> successorList = MRPlan.getSuccessors(mr);
+ MapReduceOper successors[] = null;
+
+ // Save a snapshot for successors, since we will modify MRPlan,
+ // use the list directly will be problematic
+ if (successorList!=null && successorList.size()>0)
+ {
+ successors = new MapReduceOper[successorList.size()];
+ int i=0;
+ for (MapReduceOper op:successorList)
+ successors[i++] = op;
+ }
+
+ MRPlan.add(limitAdjustMROp);
+ MRPlan.connect(mr, limitAdjustMROp);
+
+ if (successors!=null)
+ {
+ for (int i=0;i<successors.length;i++)
+ {
+ MapReduceOper nextMr = successors[i];
+ if (nextMr!=null)
+ MRPlan.disconnect(mr, nextMr);
+
+ if (nextMr!=null)
+ MRPlan.connect(limitAdjustMROp, nextMr);
+ }
+ }
+ }
+ }
+ }
+
private class FindKeyTypeVisitor extends PhyPlanVisitor {
byte keyType = DataType.UNKNOWN;
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=697190&r1=697189&r2=697190&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Fri Sep 19 12:09:35 2008
@@ -91,6 +91,10 @@
private String scope;
int requestedParallelism = -1;
+
+ // Last POLimit value in this map reduce operator, needed by LimitAdjuster
+ // to add additional map reduce operator with 1 reducer after this
+ long limit = -1;
public MapReduceOper(OperatorKey k) {
super(k);
Modified: incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld?rev=697190&r1=697189&r2=697190&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld Fri Sep 19 12:09:35 2008
@@ -1,11 +1,11 @@
MapReduce(-1) - -174:
| Store(DummyFil:DummyLdr) - -7856319821130535798
| |
-| |---New For Each(true)[bag] - -180
-| | |
-| | Project[tuple][1] - -179
+| |---Limit - -180
| |
-| |---Limit - -178
+| |---New For Each(true)[bag] - -179
+| | |
+| | Project[tuple][1] - -178
| |
| |---Package[tuple]{tuple} - -177
| Local Rearrange[tuple]{tuple}(false) - -176