You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/19 21:09:35 UTC

svn commit: r697190 - in /incubator/pig/branches/types: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ test/org/apache/pig/test/data/GoldenFiles/

Author: olga
Date: Fri Sep 19 12:09:35 2008
New Revision: 697190

URL: http://svn.apache.org/viewvc?rev=697190&view=rev
Log:
PIG-364: limit fix

Modified:
    incubator/pig/branches/types/CHANGES.txt
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld

Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=697190&r1=697189&r2=697190&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Fri Sep 19 12:09:35 2008
@@ -210,4 +210,7 @@
     PIG-436: alias is lost when single column is flattened (pradeepk via
     olgan)
 
+    PIG-364: Limit return incorrect records when we use multiple reducer
+    (daijy via olgan)
+
 

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=697190&r1=697189&r2=697190&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Sep 19 12:09:35 2008
@@ -213,6 +213,10 @@
         RearrangeAdjuster ra = new RearrangeAdjuster(MRPlan);
         ra.visit();
         
+        LimitAdjuster la = new LimitAdjuster(MRPlan);
+        la.visit();
+        la.adjust();
+        
         return MRPlan;
     }
     
@@ -634,10 +638,55 @@
         }
     }
     
+    public void simpleConnectMapToReduce(MapReduceOper mro) throws PlanException
+    {
+    	PhysicalPlan ep = new PhysicalPlan();
+        POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        prjStar.setResultType(DataType.TUPLE);
+        prjStar.setStar(true);
+        ep.add(prjStar);
+        
+        List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+        eps.add(ep);
+        
+        POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        lr.setIndex(0);
+        lr.setKeyType(DataType.TUPLE);
+        lr.setPlans(eps);
+        lr.setResultType(DataType.TUPLE);
+        
+        mro.mapPlan.addAsLeaf(lr);
+        
+        POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        pkg.setKeyType(DataType.TUPLE);
+        pkg.setNumInps(1);
+        boolean[] inner = {false};
+        pkg.setInner(inner);
+        mro.reducePlan.add(pkg);
+        
+        List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
+        List<Boolean> flat1 = new ArrayList<Boolean>();
+        PhysicalPlan ep1 = new PhysicalPlan();
+        POProject prj1 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        prj1.setResultType(DataType.TUPLE);
+        prj1.setStar(false);
+        prj1.setColumn(1);
+        prj1.setOverloaded(true);
+        ep1.add(prj1);
+        eps1.add(ep1);
+        flat1.add(true);
+        POForEach nfe1 = new POForEach(new OperatorKey(scope, nig
+                .getNextNodeId(scope)), -1, eps1, flat1);
+        nfe1.setResultType(DataType.BAG);
+        
+        mro.reducePlan.addAsLeaf(nfe1);
+    }
+    
     public void visitLimit(POLimit op) throws VisitorException{
         try{
         	
             MapReduceOper mro = compiledInputs[0];
+            mro.limit = op.getLimit();
             if (!mro.isMapDone()) {
             	// if map plan is open, add a limit for optimization, eventually we
             	// will add another limit to reduce plan
@@ -646,53 +695,15 @@
                 
                 if (mro.reducePlan.isEmpty())
                 {
-                	PhysicalPlan ep = new PhysicalPlan();
-                    POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-                    prjStar.setResultType(DataType.TUPLE);
-                    prjStar.setStar(true);
-                    ep.add(prjStar);
-                    
-                    List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
-                    eps.add(ep);
-                    
-                    POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
-                    lr.setIndex(0);
-                    lr.setKeyType(DataType.TUPLE);
-                    lr.setPlans(eps);
-                    lr.setResultType(DataType.TUPLE);
-                    mro.mapPlan.addAsLeaf(lr);
-                    
-                	POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
-                    pkg.setKeyType(DataType.TUPLE);
-                    pkg.setNumInps(1);
-                    boolean[] inner = {false};
-                    pkg.setInner(inner);
-                    mro.reducePlan.add(pkg);
-                    
+                    simpleConnectMapToReduce(mro);
+                    mro.requestedParallelism = -1;
                     POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        	    	pLimit2.setLimit(op.getLimit());
+                    pLimit2.setLimit(op.getLimit());
                     mro.reducePlan.addAsLeaf(pLimit2);
-                    
-                    List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
-                    List<Boolean> flat1 = new ArrayList<Boolean>();
-                    PhysicalPlan ep1 = new PhysicalPlan();
-                    POProject prj1 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-                    prj1.setResultType(DataType.TUPLE);
-                    prj1.setStar(false);
-                    prj1.setColumn(1);
-                    prj1.setOverloaded(true);
-                    ep1.add(prj1);
-                    eps1.add(ep1);
-                    flat1.add(true);
-                    POForEach nfe1 = new POForEach(new OperatorKey(scope, nig
-                            .getNextNodeId(scope)), op.getRequestedParallelism(), eps1,
-                            flat1);
-                    nfe1.setResultType(DataType.BAG);
-                    curMROp.reducePlan.addAsLeaf(nfe1);
                 }
                 else
                 {
-                	log.warn("Something in the reduce plan while map plan is not done. Something wrong!");
+                    log.warn("Something in the reduce plan while map plan is not done. Something wrong!");
                 }
             } else if (mro.isMapDone() && !mro.isReduceDone()) {
             	// limit should add into reduce plan
@@ -700,7 +711,6 @@
             } else {
                 log.warn("Both map and reduce phases have been done. This is unexpected while compiling!");
             }
-            curMROp = mro;
         }catch(Exception e){
             VisitorException pe = new VisitorException(e.getMessage());
             pe.initCause(e);
@@ -837,6 +847,7 @@
             int[] fields = getSortCols(op);
             MapReduceOper quant = getQuantileJob(op, mro, fSpec, quantFile, rp, fields);
             curMROp = getSortJob(op, quant, fSpec, quantFile, rp, fields);
+            
             if(op.isUDFComparatorUsed){
                 curMROp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
             }
@@ -874,6 +885,7 @@
         mro.requestedParallelism = rp;
 
         long limit = sort.getLimit();
+        mro.limit = limit;
         
         List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
 
@@ -963,24 +975,12 @@
             fe_c1.setResultType(DataType.TUPLE);
             mro.combinePlan.addAsLeaf(fe_c1);
             
-            
             POLimit pLimit = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
         	pLimit.setLimit(limit);
         	mro.combinePlan.addAsLeaf(pLimit);
             
             List<PhysicalPlan> eps_c2 = new ArrayList<PhysicalPlan>();
             eps_c2.addAll(sort.getSortPlans());
-            /*
-            for (int i : fields) {
-	            PhysicalPlan ep_c2 = new PhysicalPlan();
-	            POProject prj_c2 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-	            prj_c2.setColumn(i);
-	            prj_c2.setOverloaded(false);
-	            prj_c2.setResultType(DataType.BYTEARRAY);
-	            ep_c2.add(prj_c2);
-	            eps_c2.add(ep_c2);
-	        }
-            */
         
 	        POLocalRearrange lr_c2 = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
 	        lr_c2.setIndex(0);
@@ -1240,6 +1240,95 @@
         }
     }
 
+    private class LimitAdjuster extends MROpPlanVisitor {
+        ArrayList<MapReduceOper> opsToAdjust = new ArrayList<MapReduceOper>();  
+
+        LimitAdjuster(MROperPlan plan) {
+            super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+        }
+
+        @Override
+        public void visitMROp(MapReduceOper mr) throws VisitorException {
+            // Look for map reduce operators which contains limit operator.
+            // If so and the requestedParallelism > 1, add one additional map-reduce
+            // operator with 1 reducer into the original plan
+            if (mr.limit!=-1 && mr.requestedParallelism>1)
+            {
+                opsToAdjust.add(mr);
+            }
+        }
+        
+        public void adjust() throws IOException, PlanException
+        {
+            for (MapReduceOper mr:opsToAdjust)
+            {
+                if (mr.reducePlan.isEmpty()) return;
+                List<PhysicalOperator> mpLeaves = mr.reducePlan.getLeaves();
+                if (mpLeaves.size() != 1) {
+                    String msg = new String("Expected reduce to have single leaf");
+                    log.error(msg);
+                    throw new IOException(msg);
+                }
+                PhysicalOperator mpLeaf = mpLeaves.get(0);
+                if (!(mpLeaf instanceof POStore)) {
+                    String msg = new String("Expected leaf of reduce plan to " +
+                        "always be POStore!");
+                    log.error(msg);
+                    throw new IOException(msg);
+                }
+                FileSpec oldSpec = ((POStore)mpLeaf).getSFile();
+                
+                FileSpec fSpec = getTempFileSpec();
+                ((POStore)mpLeaf).setSFile(fSpec);
+                mr.setReduceDone(true);
+                MapReduceOper limitAdjustMROp = getMROp();
+                POLoad ld = getLoad();
+                ld.setLFile(fSpec);
+                limitAdjustMROp.mapPlan.add(ld);
+                POLimit pLimit = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
+                pLimit.setLimit(mr.limit);
+                limitAdjustMROp.mapPlan.addAsLeaf(pLimit);
+                simpleConnectMapToReduce(limitAdjustMROp);
+                POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
+                pLimit2.setLimit(mr.limit);
+                limitAdjustMROp.reducePlan.addAsLeaf(pLimit2);
+                POStore st = getStore();
+                st.setSFile(oldSpec);
+                limitAdjustMROp.reducePlan.addAsLeaf(st);
+                limitAdjustMROp.requestedParallelism = -1;
+                
+                List<MapReduceOper> successorList = MRPlan.getSuccessors(mr);
+                MapReduceOper successors[] = null;
+                
+                // Save a snapshot for successors, since we will modify MRPlan, 
+                // use the list directly will be problematic
+                if (successorList!=null && successorList.size()>0)
+                {
+                    successors = new MapReduceOper[successorList.size()];
+                    int i=0;
+                    for (MapReduceOper op:successorList)
+                        successors[i++] = op;
+                }
+                
+                MRPlan.add(limitAdjustMROp);
+                MRPlan.connect(mr, limitAdjustMROp);
+                
+                if (successors!=null)
+                {
+                    for (int i=0;i<successors.length;i++)
+                    {
+                        MapReduceOper nextMr = successors[i];
+                        if (nextMr!=null)
+                            MRPlan.disconnect(mr, nextMr);
+                        
+                        if (nextMr!=null)
+                            MRPlan.connect(limitAdjustMROp, nextMr);                        
+                    }
+                }
+            }
+        }
+    }
+
     private class FindKeyTypeVisitor extends PhyPlanVisitor {
 
         byte keyType = DataType.UNKNOWN;

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=697190&r1=697189&r2=697190&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Fri Sep 19 12:09:35 2008
@@ -91,6 +91,10 @@
     private String scope;
     
     int requestedParallelism = -1;
+    
+    // Last POLimit value in this map reduce operator, needed by LimitAdjuster
+    // to add additional map reduce operator with 1 reducer after this
+    long limit = -1;
 
     public MapReduceOper(OperatorKey k) {
         super(k);

Modified: incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld?rev=697190&r1=697189&r2=697190&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld Fri Sep 19 12:09:35 2008
@@ -1,11 +1,11 @@
 MapReduce(-1) - -174:
 |   Store(DummyFil:DummyLdr) - -7856319821130535798
 |   |
-|   |---New For Each(true)[bag] - -180
-|       |   |
-|       |   Project[tuple][1] - -179
+|   |---Limit - -180
 |       |
-|       |---Limit - -178
+|       |---New For Each(true)[bag] - -179
+|           |   |
+|           |   Project[tuple][1] - -178
 |           |
 |           |---Package[tuple]{tuple} - -177
 |   Local Rearrange[tuple]{tuple}(false) - -176