You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/07/29 03:27:15 UTC
svn commit: r680594 -
/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Author: gates
Date: Mon Jul 28 18:27:14 2008
New Revision: 680594
URL: http://svn.apache.org/viewvc?rev=680594&view=rev
Log:
PIG-344 Fix sorting to work on types other than just byte array.
Modified:
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=680594&r1=680593&r2=680594&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Jul 28 18:27:14 2008
@@ -819,7 +819,7 @@
int rp = op.getRequestedParallelism();
int[] fields = getSortCols(op);
MapReduceOper quant = getQuantileJob(op, mro, fSpec, quantFile, rp, fields);
- curMROp = getSortJob(quant, fSpec, quantFile, rp, fields, op.getLimit());
+ curMROp = getSortJob(op, quant, fSpec, quantFile, rp, fields);
if(op.isUDFComparatorUsed){
curMROp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
}
@@ -846,18 +846,22 @@
}
public MapReduceOper getSortJob(
+ POSort sort,
MapReduceOper quantJob,
FileSpec lFile,
FileSpec quantFile,
int rp,
- int[] fields,
- long limit) throws PlanException{
+ int[] fields) throws PlanException{
MapReduceOper mro = startNew(lFile, quantJob);
mro.setQuantFile(quantFile.getFileName());
mro.setGlobalSort(true);
mro.requestedParallelism = rp;
+
+ long limit = sort.getLimit();
List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
+
+ byte keyType = DataType.UNKNOWN;
if (fields == null) {
// This is project *
@@ -869,6 +873,7 @@
ep.add(prj);
eps1.add(ep);
} else {
+ /*
for (int i : fields) {
PhysicalPlan ep = new PhysicalPlan();
POProject prj = new POProject(new OperatorKey(scope,
@@ -879,21 +884,37 @@
ep.add(prj);
eps1.add(ep);
}
+ */
+ // Attach the sort plans to the local rearrange to get the
+ // projection.
+ eps1.addAll(sort.getSortPlans());
+
+ // Visit the first sort plan to figure out our key type. We only
+ // have to visit the first because if we have more than one plan,
+ // then the key type will be tuple.
+ try {
+ FindKeyTypeVisitor fktv =
+ new FindKeyTypeVisitor(sort.getSortPlans().get(0));
+ fktv.visit();
+ keyType = fktv.keyType;
+ } catch (VisitorException ve) {
+ throw new PlanException(ve);
+ }
}
POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
lr.setIndex(0);
- lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
+ lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
+ keyType);
lr.setPlans(eps1);
lr.setResultType(DataType.TUPLE);
mro.mapPlan.addAsLeaf(lr);
mro.setMapDone(true);
- if (limit!=-1)
- {
+ if (limit!=-1) {
POPackage pkg_c = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pkg_c.setKeyType((fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
+ pkg_c.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
pkg_c.setNumInps(1);
//pkg.setResultType(DataType.TUPLE);
boolean[] inner = {false};
@@ -921,6 +942,8 @@
mro.combinePlan.addAsLeaf(pLimit);
List<PhysicalPlan> eps_c2 = new ArrayList<PhysicalPlan>();
+ eps_c2.addAll(sort.getSortPlans());
+ /*
for (int i : fields) {
PhysicalPlan ep_c2 = new PhysicalPlan();
POProject prj_c2 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
@@ -930,17 +953,19 @@
ep_c2.add(prj_c2);
eps_c2.add(ep_c2);
}
+ */
POLocalRearrange lr_c2 = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
lr_c2.setIndex(0);
- lr_c2.setKeyType((fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
+ lr_c2.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
lr_c2.setPlans(eps_c2);
lr_c2.setResultType(DataType.TUPLE);
mro.combinePlan.addAsLeaf(lr_c2);
}
POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pkg.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
+ pkg.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
+ keyType);
pkg.setNumInps(1);
boolean[] inner = {false};
pkg.setInner(inner);
@@ -1186,5 +1211,20 @@
}
}
}
+
+ private class FindKeyTypeVisitor extends PhyPlanVisitor {
+
+ byte keyType = DataType.UNKNOWN;
+
+ FindKeyTypeVisitor(PhysicalPlan plan) {
+ super(plan,
+ new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+ }
+
+ @Override
+ public void visitProject(POProject p) throws VisitorException {
+ keyType = p.getResultType();
+ }
+ }
}