You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/07/10 20:09:37 UTC

svn commit: r675663 - in /incubator/pig/branches/types: src/org/apache/pig/impl/mapReduceLayer/ src/org/apache/pig/impl/physicalLayer/expressionOperators/ test/org/apache/pig/test/data/GoldenFiles/ test/org/apache/pig/test/utils/

Author: gates
Date: Thu Jul 10 11:09:36 2008
New Revision: 675663

URL: http://svn.apache.org/viewvc?rev=675663&view=rev
Log:
PIG-293  Previous patches solved infinite loop.  This patch makes order by in map reduce mode actually work.


Modified:
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
    incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java?rev=675663&r1=675662&r2=675663&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java Thu Jul 10 11:09:36 2008
@@ -180,6 +180,7 @@
     public MROperPlan compile() throws IOException, PlanException, VisitorException {
         List<PhysicalOperator> leaves = plan.getLeaves();
         POStore store = (POStore)leaves.get(0);
+System.out.println("store file is " + store.getSFile());
         FileLocalizer.registerDeleteOnFail(store.getSFile().getFileName(), pigContext);
         compile(store);
         
@@ -729,20 +730,27 @@
         }
     }
     
-    private int[] getSortCols(POSort sort){
+    private int[] getSortCols(POSort sort) throws PlanException {
         List<PhysicalPlan> plans = sort.getSortPlans();
         if(plans!=null){
             int[] ret = new int[plans.size()]; 
             int i=-1;
             for (PhysicalPlan plan : plans) {
+                if (((POProject)plan.getLeaves().get(0)).isStar()) return null;
                 ret[++i] = ((POProject)plan.getLeaves().get(0)).getColumn();
             }
             return ret;
         }
-        return null;
+        log.error("No expression plan found in POSort");
+        throw new PlanException("No Expression Plan found in POSort");
     }
     
-    public MapReduceOper getSortJob(MapReduceOper quantJob, FileSpec lFile, FileSpec quantFile, int rp, int[] fields) throws PlanException{
+    public MapReduceOper getSortJob(
+            MapReduceOper quantJob,
+            FileSpec lFile,
+            FileSpec quantFile,
+            int rp,
+            int[] fields) throws PlanException{
         MapReduceOper mro = startNew(lFile, quantJob);
         mro.setQuantFile(quantFile.getFileName());
         mro.setGlobalSort(true);
@@ -750,23 +758,31 @@
         
         List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
         
-        if(fields==null) {
-            log.error("No Expression Plan found in POSort");
-            throw new PlanException("No Expression Plan found in POSort");
-        }
-        for (int i : fields) {
+        if (fields == null) {
+            // This is project *
             PhysicalPlan ep = new PhysicalPlan();
             POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-            prj.setColumn(i);
+            prj.setStar(true);
             prj.setOverloaded(false);
-            prj.setResultType(DataType.BYTEARRAY);
+            prj.setResultType(DataType.TUPLE);
             ep.add(prj);
             eps1.add(ep);
+        } else {
+            for (int i : fields) {
+                PhysicalPlan ep = new PhysicalPlan();
+                POProject prj = new POProject(new OperatorKey(scope,
+                    nig.getNextNodeId(scope)));
+                prj.setColumn(i);
+                prj.setOverloaded(false);
+                prj.setResultType(DataType.BYTEARRAY);
+                ep.add(prj);
+                eps1.add(ep);
+            }
         }
         
         POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
         lr.setIndex(0);
-        lr.setKeyType((fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
+        lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
         lr.setPlans(eps1);
         lr.setResultType(DataType.TUPLE);
         mro.mapPlan.addAsLeaf(lr);
@@ -774,7 +790,7 @@
         mro.setMapDone(true);
         
         POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        pkg.setKeyType((fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
+        pkg.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
         pkg.setNumInps(1);
         boolean[] inner = {false}; 
         pkg.setInner(inner);
@@ -810,19 +826,27 @@
         List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
         List<Boolean> flat1 = new ArrayList<Boolean>();
         
-        if(fields==null) {
-            log.error("No Expression Plan found in POSort");
-            throw new PlanException("No Expression Plan found in POSort");
-        }
-        for (int i : fields) {
+        if (fields == null) {
             PhysicalPlan ep = new PhysicalPlan();
-            POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-            prj.setColumn(i);
+            POProject prj = new POProject(new OperatorKey(scope,
+                nig.getNextNodeId(scope)));
+            prj.setStar(true);
             prj.setOverloaded(false);
-            prj.setResultType(DataType.BYTEARRAY);
+            prj.setResultType(DataType.TUPLE);
             ep.add(prj);
             eps1.add(ep);
             flat1.add(true);
+        } else {
+            for (int i : fields) {
+                PhysicalPlan ep = new PhysicalPlan();
+                POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+                prj.setColumn(i);
+                prj.setOverloaded(false);
+                prj.setResultType(DataType.BYTEARRAY);
+                ep.add(prj);
+                eps1.add(ep);
+                flat1.add(true);
+            }
         }
         POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1,eps1,flat1);
         mro.mapPlan.addAsLeaf(nfe1);

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java?rev=675663&r1=675662&r2=675663&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java Thu Jul 10 11:09:36 2008
@@ -135,7 +135,9 @@
         if(res.returnStatus != POStatus.STATUS_OK){
             return res;
         }
-        if(columns.size() == 1) {
+        if (star) {
+            return res;
+        } else if(columns.size() == 1) {
             ret = inpValue.get(columns.get(0));
         } else {
 	        ArrayList<Object> objList = new ArrayList<Object>(columns.size()); 

Modified: incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld?rev=675663&r1=675662&r2=675663&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld Thu Jul 10 11:09:36 2008
@@ -43,10 +43,10 @@
         |               |   |
         |               |   Project[bag][1] - -158
         |               |
-        |               |---Package[tuple]{bytearray} - -157
-        |   Local Rearrange[tuple]{bytearray} - -156
+        |               |---Package[tuple]{tuple} - -157
+        |   Local Rearrange[tuple]{tuple} - -156
         |   |   |
-        |   |   Project[bytearray][0] - -155
+        |   |   Project[tuple][*] - -155
         |   |
         |   |---Load(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -153
         |
@@ -76,7 +76,7 @@
             |   |
             |   |---New For Each(true)[tuple] - -141
             |       |   |
-            |       |   Project[bytearray][0] - -140
+            |       |   Project[tuple][*] - -140
             |       |
             |       |---Load(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.impl.builtin.RandomSampleLoader) - -138
             |

Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java?rev=675663&r1=675662&r2=675663&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java Thu Jul 10 11:09:36 2008
@@ -744,6 +744,7 @@
     public static POStore topStoreOp() {
         POStore ret = new POStore(new OperatorKey("", r.nextLong()));
         ret.setPc(pc);
+        ret.setSFile(new FileSpec("DummyFil", "DummyLdr"));
         return ret;
     }