You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/07/10 20:09:37 UTC
svn commit: r675663 - in /incubator/pig/branches/types:
src/org/apache/pig/impl/mapReduceLayer/
src/org/apache/pig/impl/physicalLayer/expressionOperators/
test/org/apache/pig/test/data/GoldenFiles/ test/org/apache/pig/test/utils/
Author: gates
Date: Thu Jul 10 11:09:36 2008
New Revision: 675663
URL: http://svn.apache.org/viewvc?rev=675663&view=rev
Log:
PIG-293 Previous patches solved infinite loop. This patch makes order by in map reduce mode actually work.
Modified:
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java?rev=675663&r1=675662&r2=675663&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java Thu Jul 10 11:09:36 2008
@@ -180,6 +180,7 @@
public MROperPlan compile() throws IOException, PlanException, VisitorException {
List<PhysicalOperator> leaves = plan.getLeaves();
POStore store = (POStore)leaves.get(0);
+System.out.println("store file is " + store.getSFile());
FileLocalizer.registerDeleteOnFail(store.getSFile().getFileName(), pigContext);
compile(store);
@@ -729,20 +730,27 @@
}
}
- private int[] getSortCols(POSort sort){
+ private int[] getSortCols(POSort sort) throws PlanException {
List<PhysicalPlan> plans = sort.getSortPlans();
if(plans!=null){
int[] ret = new int[plans.size()];
int i=-1;
for (PhysicalPlan plan : plans) {
+ if (((POProject)plan.getLeaves().get(0)).isStar()) return null;
ret[++i] = ((POProject)plan.getLeaves().get(0)).getColumn();
}
return ret;
}
- return null;
+ log.error("No expression plan found in POSort");
+ throw new PlanException("No Expression Plan found in POSort");
}
- public MapReduceOper getSortJob(MapReduceOper quantJob, FileSpec lFile, FileSpec quantFile, int rp, int[] fields) throws PlanException{
+ public MapReduceOper getSortJob(
+ MapReduceOper quantJob,
+ FileSpec lFile,
+ FileSpec quantFile,
+ int rp,
+ int[] fields) throws PlanException{
MapReduceOper mro = startNew(lFile, quantJob);
mro.setQuantFile(quantFile.getFileName());
mro.setGlobalSort(true);
@@ -750,23 +758,31 @@
List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
- if(fields==null) {
- log.error("No Expression Plan found in POSort");
- throw new PlanException("No Expression Plan found in POSort");
- }
- for (int i : fields) {
+ if (fields == null) {
+ // This is project *
PhysicalPlan ep = new PhysicalPlan();
POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- prj.setColumn(i);
+ prj.setStar(true);
prj.setOverloaded(false);
- prj.setResultType(DataType.BYTEARRAY);
+ prj.setResultType(DataType.TUPLE);
ep.add(prj);
eps1.add(ep);
+ } else {
+ for (int i : fields) {
+ PhysicalPlan ep = new PhysicalPlan();
+ POProject prj = new POProject(new OperatorKey(scope,
+ nig.getNextNodeId(scope)));
+ prj.setColumn(i);
+ prj.setOverloaded(false);
+ prj.setResultType(DataType.BYTEARRAY);
+ ep.add(prj);
+ eps1.add(ep);
+ }
}
POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
lr.setIndex(0);
- lr.setKeyType((fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
+ lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
lr.setPlans(eps1);
lr.setResultType(DataType.TUPLE);
mro.mapPlan.addAsLeaf(lr);
@@ -774,7 +790,7 @@
mro.setMapDone(true);
POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pkg.setKeyType((fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
+ pkg.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
pkg.setNumInps(1);
boolean[] inner = {false};
pkg.setInner(inner);
@@ -810,19 +826,27 @@
List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
List<Boolean> flat1 = new ArrayList<Boolean>();
- if(fields==null) {
- log.error("No Expression Plan found in POSort");
- throw new PlanException("No Expression Plan found in POSort");
- }
- for (int i : fields) {
+ if (fields == null) {
PhysicalPlan ep = new PhysicalPlan();
- POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- prj.setColumn(i);
+ POProject prj = new POProject(new OperatorKey(scope,
+ nig.getNextNodeId(scope)));
+ prj.setStar(true);
prj.setOverloaded(false);
- prj.setResultType(DataType.BYTEARRAY);
+ prj.setResultType(DataType.TUPLE);
ep.add(prj);
eps1.add(ep);
flat1.add(true);
+ } else {
+ for (int i : fields) {
+ PhysicalPlan ep = new PhysicalPlan();
+ POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ prj.setColumn(i);
+ prj.setOverloaded(false);
+ prj.setResultType(DataType.BYTEARRAY);
+ ep.add(prj);
+ eps1.add(ep);
+ flat1.add(true);
+ }
}
POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1,eps1,flat1);
mro.mapPlan.addAsLeaf(nfe1);
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java?rev=675663&r1=675662&r2=675663&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java Thu Jul 10 11:09:36 2008
@@ -135,7 +135,9 @@
if(res.returnStatus != POStatus.STATUS_OK){
return res;
}
- if(columns.size() == 1) {
+ if (star) {
+ return res;
+ } else if(columns.size() == 1) {
ret = inpValue.get(columns.get(0));
} else {
ArrayList<Object> objList = new ArrayList<Object>(columns.size());
Modified: incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld?rev=675663&r1=675662&r2=675663&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld Thu Jul 10 11:09:36 2008
@@ -43,10 +43,10 @@
| | |
| | Project[bag][1] - -158
| |
- | |---Package[tuple]{bytearray} - -157
- | Local Rearrange[tuple]{bytearray} - -156
+ | |---Package[tuple]{tuple} - -157
+ | Local Rearrange[tuple]{tuple} - -156
| | |
- | | Project[bytearray][0] - -155
+ | | Project[tuple][*] - -155
| |
| |---Load(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -153
|
@@ -76,7 +76,7 @@
| |
| |---New For Each(true)[tuple] - -141
| | |
- | | Project[bytearray][0] - -140
+ | | Project[tuple][*] - -140
| |
| |---Load(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.impl.builtin.RandomSampleLoader) - -138
|
Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java?rev=675663&r1=675662&r2=675663&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java Thu Jul 10 11:09:36 2008
@@ -744,6 +744,7 @@
public static POStore topStoreOp() {
POStore ret = new POStore(new OperatorKey("", r.nextLong()));
ret.setPc(pc);
+ ret.setSFile(new FileSpec("DummyFil", "DummyLdr"));
return ret;
}