You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/07/29 03:27:15 UTC

svn commit: r680594 - /incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java

Author: gates
Date: Mon Jul 28 18:27:14 2008
New Revision: 680594

URL: http://svn.apache.org/viewvc?rev=680594&view=rev
Log:
PIG-344 Fix sorting to work on types other than just byte array.


Modified:
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=680594&r1=680593&r2=680594&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Jul 28 18:27:14 2008
@@ -819,7 +819,7 @@
             int rp = op.getRequestedParallelism();
             int[] fields = getSortCols(op);
             MapReduceOper quant = getQuantileJob(op, mro, fSpec, quantFile, rp, fields);
-            curMROp = getSortJob(quant, fSpec, quantFile, rp, fields, op.getLimit());
+            curMROp = getSortJob(op, quant, fSpec, quantFile, rp, fields);
             if(op.isUDFComparatorUsed){
                 curMROp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
             }
@@ -846,18 +846,22 @@
     }
     
     public MapReduceOper getSortJob(
+            POSort sort,
             MapReduceOper quantJob,
             FileSpec lFile,
             FileSpec quantFile,
             int rp,
-            int[] fields,
-            long limit) throws PlanException{
+            int[] fields) throws PlanException{
         MapReduceOper mro = startNew(lFile, quantJob);
         mro.setQuantFile(quantFile.getFileName());
         mro.setGlobalSort(true);
         mro.requestedParallelism = rp;
+
+        long limit = sort.getLimit();
         
         List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
+
+        byte keyType = DataType.UNKNOWN;
         
         if (fields == null) {
             // This is project *
@@ -869,6 +873,7 @@
             ep.add(prj);
             eps1.add(ep);
         } else {
+            /*
             for (int i : fields) {
                 PhysicalPlan ep = new PhysicalPlan();
                 POProject prj = new POProject(new OperatorKey(scope,
@@ -879,21 +884,37 @@
                 ep.add(prj);
                 eps1.add(ep);
             }
+            */
+            // Attach the sort plans to the local rearrange to get the
+            // projection.
+            eps1.addAll(sort.getSortPlans());
+
+            // Visit the first sort plan to figure out our key type.  We only
+            // have to visit the first because if we have more than one plan,
+            // then the key type will be tuple.
+            try {
+                FindKeyTypeVisitor fktv =
+                    new FindKeyTypeVisitor(sort.getSortPlans().get(0));
+                fktv.visit();
+                keyType = fktv.keyType;
+            } catch (VisitorException ve) {
+                throw new PlanException(ve);
+            }
         }
         
         POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
         lr.setIndex(0);
-        lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
+        lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
+            keyType);
         lr.setPlans(eps1);
         lr.setResultType(DataType.TUPLE);
         mro.mapPlan.addAsLeaf(lr);
         
         mro.setMapDone(true);
         
-        if (limit!=-1)
-        {
+        if (limit!=-1) {
         	POPackage pkg_c = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        	pkg_c.setKeyType((fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
+        	pkg_c.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
             pkg_c.setNumInps(1);
             //pkg.setResultType(DataType.TUPLE);
             boolean[] inner = {false};
@@ -921,6 +942,8 @@
         	mro.combinePlan.addAsLeaf(pLimit);
             
             List<PhysicalPlan> eps_c2 = new ArrayList<PhysicalPlan>();
+            eps_c2.addAll(sort.getSortPlans());
+            /*
             for (int i : fields) {
 	            PhysicalPlan ep_c2 = new PhysicalPlan();
 	            POProject prj_c2 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
@@ -930,17 +953,19 @@
 	            ep_c2.add(prj_c2);
 	            eps_c2.add(ep_c2);
 	        }
+            */
         
 	        POLocalRearrange lr_c2 = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
 	        lr_c2.setIndex(0);
-	        lr_c2.setKeyType((fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
+	        lr_c2.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
 	        lr_c2.setPlans(eps_c2);
 	        lr_c2.setResultType(DataType.TUPLE);
 	        mro.combinePlan.addAsLeaf(lr_c2);
         }
         
         POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        pkg.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
+        pkg.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
+            keyType);
         pkg.setNumInps(1);
         boolean[] inner = {false}; 
         pkg.setInner(inner);
@@ -1186,5 +1211,20 @@
             }
         }
     }
+
+    private class FindKeyTypeVisitor extends PhyPlanVisitor {
+
+        byte keyType = DataType.UNKNOWN;
+
+        FindKeyTypeVisitor(PhysicalPlan plan) {
+            super(plan,
+                new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+        }
+
+        @Override
+        public void visitProject(POProject p) throws VisitorException {
+            keyType = p.getResultType();
+        }
+    }
     
 }